From dc37f0532a06e11c564a6debec5baa23ecaf7761 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Thu, 16 Mar 2017 19:35:10 -0700 Subject: [PATCH 1/9] relocate method in BufferAggregator. --- .../theta/SketchBufferAggregator.java | 42 +++++++-- .../datasketches/theta/SketchHolder.java | 12 +++ ...BufferGrouperTestForSketchAggregation.java | 91 +++++++++++++++++++ .../theta/SketchAggregationTest.java | 20 ++++ .../query/aggregation/BufferAggregator.java | 13 +++ .../groupby/epinephelinae/BufferGrouper.java | 10 +- .../aggregation/AggregationTestHelper.java | 29 ++++++ 7 files changed, 207 insertions(+), 10 deletions(-) create mode 100644 extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index c36f908d339a..3e6c0f6d314d 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Map; public class SketchBufferAggregator implements BufferAggregator @@ -38,10 +39,8 @@ public class SketchBufferAggregator implements BufferAggregator private final ObjectColumnSelector selector; private final int size; private final int maxIntermediateSize; - - private NativeMemory nm; - private final Map unions = new HashMap<>(); //position in BB -> Union Object + private IdentityHashMap nmCache = new IdentityHashMap<>(); public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize) { @@ -53,11 +52,8 @@ public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIn @Override public void init(ByteBuffer buf, int position) { - if (nm == null) { - nm = new NativeMemory(buf); - } - - Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + NativeMemory nativeMemory = getNativeMemory(buf); + Memory mem = new MemoryRegion(nativeMemory, position, maxIntermediateSize); unions.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION)); } @@ -89,6 +85,7 @@ private Union getUnion(ByteBuffer buf, int position) { Union union = unions.get(position); if (union == null) { + NativeMemory nm = getNativeMemory(buf); Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); union = (Union) SetOperation.wrap(mem); unions.put(position, union); @@ -119,4 +116,33 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("selector", selector); } + + @Override + public void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer) + { + NativeMemory nm = getNativeMemory(newBuffer); + + Memory mem = new MemoryRegion(nm, newPosition, maxIntermediateSize); + Union newUnion = (Union) SetOperation.wrap(mem); + + Union union = unions.get(oldPosition); + if (union != null) { + unions.remove(oldPosition); + unions.put(newPosition, newUnion); + } else { + unions.put(newPosition, newUnion); + } + + } + + private NativeMemory getNativeMemory(ByteBuffer buffer) + { + NativeMemory nm = nmCache.get(buffer); + if (nm == null) { + nm = new NativeMemory(buffer); + nmCache.put(buffer, nm); + } + return nm; + } + } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java index 54fa865d7970..b888335764fd 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -290,4 +290,16 @@ public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object. throw new IllegalArgumentException("Unknown sketch operation " + func); } } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return this.getSketch().equals(((SketchHolder) o).getSketch()); + } } diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java new file mode 100644 index 000000000000..c9971fbd74bd --- /dev/null +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java @@ -0,0 +1,91 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.theta; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.UpdateSketch; +import io.druid.data.input.MapBasedRow; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.groupby.epinephelinae.BufferGrouper; +import io.druid.query.groupby.epinephelinae.Grouper; +import io.druid.query.groupby.epinephelinae.GrouperTestUtil; +import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class BufferGrouperTestForSketchAggregation +{ + private static BufferGrouper makeGrouper( + TestColumnSelectorFactory columnSelectorFactory, + int bufferSize, + int initialBuckets + ) + { + final BufferGrouper grouper = new BufferGrouper<>( + Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)), + GrouperTestUtil.intKeySerde(), + columnSelectorFactory, + new AggregatorFactory[]{ + new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2), + new CountAggregatorFactory("count") + }, + Integer.MAX_VALUE, + 0.75f, + initialBuckets + ); + grouper.init(); + return grouper; + } + + @Test + public void testGrowingBufferGrouper() + { + final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + final Grouper grouper = makeGrouper(columnSelectorFactory, 100000, 2); + final int expectedMaxSize = 5; + + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); + updateSketch.update(1); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + } + + updateSketch.update(3); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + } + + Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues(); + + Assert.assertEquals(2.0d, ((SketchHolder) holders[0]).getEstimate(), 0); + } +} diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index e6167c151497..3dd699bec0f0 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -28,6 +28,7 @@ import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.theta.UpdateSketch; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.java.util.common.granularity.Granularities; @@ -39,6 +40,8 @@ import io.druid.query.aggregation.post.FieldAccessPostAggregator; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryRunnerTest; +import io.druid.query.groupby.epinephelinae.GrouperTestUtil; +import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; @@ -389,6 +392,23 @@ public void testSketchAggregatorFactoryComparator() Assert.assertEquals(1, comparator.compare(SketchHolder.of(union2), SketchHolder.of(sketch1))); } + @Test + public void testRelocation() + { + final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); + updateSketch.update(1); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + SketchHolder[] holders = helper.runRelocateVerificationTest( + new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2), + columnSelectorFactory, + SketchHolder.class + ); + Assert.assertEquals(holders[0].getEstimate(), holders[1].getEstimate(), 0); + } + private void assertPostAggregatorSerde(PostAggregator agg) throws Exception { Assert.assertEquals( diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index 3b565dcce2d2..dda5a0205017 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -126,4 +126,17 @@ public interface BufferAggregator extends HotLoopCallee default void inspectRuntimeShape(RuntimeShapeInspector inspector) { } + + /* + * Relocates any cached objects. + * Implementations must not change the position, limit or mark of the given buffer + * + * @param oldPostition old position of an item in old ByteBuffer. + * @param newPosition new position of an item in new ByteBuffer. + * @param newBuffer ByteBuffer to be used. + */ + default void relocate(int oldPostition, int newPosition, ByteBuffer newBuffer) + { + } + } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 4987d34b6b14..350593996619 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -430,8 +430,9 @@ private void growIfPossible() for (int oldBucket = 0; oldBucket < buckets; oldBucket++) { if (isUsed(oldBucket)) { + int oldPosition = oldBucket * bucketSize; entryBuffer.limit((oldBucket + 1) * bucketSize); - entryBuffer.position(oldBucket * bucketSize); + entryBuffer.position(oldPosition); keyBuffer.limit(entryBuffer.position() + HASH_SIZE + keySize); keyBuffer.position(entryBuffer.position() + HASH_SIZE); @@ -442,9 +443,14 @@ private void growIfPossible() throw new ISE("WTF?! Couldn't find a bucket while resizing?!"); } - newTableBuffer.position(newBucket * bucketSize); + int newPostition = newBucket * bucketSize; + newTableBuffer.position(newPostition); newTableBuffer.put(entryBuffer); + for (int i = 0; i < aggregators.length; i++) { + aggregators[i].relocate(oldPosition + aggregatorOffsets[i], newPostition + aggregatorOffsets[i], newTableBuffer); + } + buffer.putInt(tableArenaSize + newSize * Ints.BYTES, newBucket * bucketSize); newSize++; } diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index e3eac6e43bf4..6d40cc48d1c6 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -67,6 +67,7 @@ import io.druid.query.topn.TopNQueryConfig; import io.druid.query.topn.TopNQueryQueryToolChest; import io.druid.query.topn.TopNQueryRunnerFactory; +import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; @@ -78,12 +79,14 @@ import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; +import org.junit.Assert; import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; @@ -591,5 +594,31 @@ public ObjectMapper getObjectMapper() { return mapper; } + + public T[] runRelocateVerificationTest( + AggregatorFactory factory, + ColumnSelectorFactory selector, + Class clazz + ) + { + T[] results = (T[]) Array.newInstance(clazz, 2); + BufferAggregator agg = factory.factorizeBuffered(selector); + ByteBuffer myBuf = ByteBuffer.allocate(10040902); + agg.init(myBuf, 0); + agg.aggregate(myBuf, 0); + results[0] = (T) agg.get(myBuf, 0); + + byte[] theBytes = new byte[factory.getMaxIntermediateSize()]; + myBuf.get(theBytes); + + ByteBuffer newBuf = ByteBuffer.allocate(941209); + newBuf.position(7574); + newBuf.put(theBytes); + newBuf.position(0); + + agg.relocate(0, 7574, newBuf); + results[1] = (T) agg.get(newBuf, 7574); + return results; + } } From a0d7a0ba21d01d2cba413eaeeff0981ac5ca1b51 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Thu, 16 Mar 2017 19:44:27 -0700 Subject: [PATCH 2/9] Unused import. --- .../java/io/druid/query/aggregation/AggregationTestHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 6d40cc48d1c6..9a725fd5551f 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -79,7 +79,6 @@ import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; -import org.junit.Assert; import org.junit.rules.TemporaryFolder; import java.io.File; From a24e3a398c9479fa990f9b831b4ef503fed83be7 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Thu, 16 Mar 2017 23:51:58 -0700 Subject: [PATCH 3/9] Detailed javadoc. --- .../datasketches/theta/SketchBufferAggregator.java | 10 +++------- ...ouperUsingSketchMergeAggregatorFactoryTest.java} | 2 +- .../druid/query/aggregation/BufferAggregator.java | 13 ++++++++++--- 3 files changed, 14 insertions(+), 11 deletions(-) rename extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/{BufferGrouperTestForSketchAggregation.java => BufferGrouperUsingSketchMergeAggregatorFactoryTest.java} (98%) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 3e6c0f6d314d..51fea886c3b5 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -40,7 +40,7 @@ public class SketchBufferAggregator implements BufferAggregator private final int size; private final int maxIntermediateSize; private final Map unions = new HashMap<>(); //position in BB -> Union Object - private IdentityHashMap nmCache = new IdentityHashMap<>(); + private final IdentityHashMap nmCache = new IdentityHashMap<>(); public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize) { @@ -126,12 +126,8 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer) Union newUnion = (Union) SetOperation.wrap(mem); Union union = unions.get(oldPosition); - if (union != null) { - unions.remove(oldPosition); - unions.put(newPosition, newUnion); - } else { - unions.put(newPosition, newUnion); - } + unions.remove(oldPosition); + unions.put(newPosition, newUnion); } diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java similarity index 98% rename from extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java rename to extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java index c9971fbd74bd..0637cc13ed4c 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperTestForSketchAggregation.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java @@ -36,7 +36,7 @@ import java.nio.ByteBuffer; -public class BufferGrouperTestForSketchAggregation +public class BufferGrouperUsingSketchMergeAggregatorFactoryTest { private static BufferGrouper makeGrouper( TestColumnSelectorFactory columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index dda5a0205017..d2805191a6c8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -129,11 +129,18 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) /* * Relocates any cached objects. + * If underlying ByteBuffer used for aggregation buffer relocates to a new ByteBuffer, positional caches(if any) + * built on top of old ByteBuffer can not be used for further {@link BufferAggregator#aggregate(ByteBuffer, int)} + * calls. This method tells the BufferAggregator that the cached objects at a certain location has been relocated to + * a different location. + * + * Only used if there is any positional caches/objects in the BufferAggregator implementation. + * * Implementations must not change the position, limit or mark of the given buffer * - * @param oldPostition old position of an item in old ByteBuffer. - * @param newPosition new position of an item in new ByteBuffer. - * @param newBuffer ByteBuffer to be used. + * @param oldPostition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. + * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. + * @param newBuffer new aggregation buffer. */ default void relocate(int oldPostition, int newPosition, ByteBuffer newBuffer) { From 34da4fe1bbf2fba82baa83704a3ceddbe682aa27 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Fri, 17 Mar 2017 18:24:29 -0700 Subject: [PATCH 4/9] using Int2ObjectMap. --- .../theta/SketchBufferAggregator.java | 7 ++-- ...UsingSketchMergeAggregatorFactoryTest.java | 36 +++++++++++-------- .../query/aggregation/BufferAggregator.java | 4 +-- .../groupby/epinephelinae/BufferGrouper.java | 10 ++++-- 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 51fea886c3b5..041fc82711aa 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -28,18 +28,18 @@ import io.druid.query.aggregation.BufferAggregator; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ObjectColumnSelector; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.IdentityHashMap; -import java.util.Map; public class SketchBufferAggregator implements BufferAggregator { private final ObjectColumnSelector selector; private final int size; private final int maxIntermediateSize; - private final Map unions = new HashMap<>(); //position in BB -> Union Object + private final Int2ObjectMap unions = new Int2ObjectOpenHashMap<>(); private final IdentityHashMap nmCache = new IdentityHashMap<>(); public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize) @@ -125,7 +125,6 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer) Memory mem = new MemoryRegion(nm, newPosition, maxIntermediateSize); Union newUnion = (Union) SetOperation.wrap(mem); - Union union = unions.get(oldPosition); unions.remove(oldPosition); unions.put(newPosition, newUnion); diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java index 0637cc13ed4c..2d8f6e6ae7af 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java @@ -65,27 +65,33 @@ public void testGrowingBufferGrouper() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); final Grouper grouper = makeGrouper(columnSelectorFactory, 100000, 2); - final int expectedMaxSize = 5; + try { + final int expectedMaxSize = 5; - SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); - UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); - updateSketch.update(1); + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); + updateSketch.update(1); - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); - } + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + } - updateSketch.update(3); - columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + updateSketch.update(3); + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); - for (int i = 0; i < expectedMaxSize; i++) { - Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); - } + for (int i = 0; i < expectedMaxSize; i++) { + Assert.assertTrue(String.valueOf(i), grouper.aggregate(i)); + } - Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues(); + Object[] holders = Lists.newArrayList(grouper.iterator(true)).get(0).getValues(); - Assert.assertEquals(2.0d, ((SketchHolder) holders[0]).getEstimate(), 0); + Assert.assertEquals(2.0d, ((SketchHolder) holders[0]).getEstimate(), 0); + } + finally { + grouper.close(); + } } + } diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index d2805191a6c8..d72c85d74e17 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -138,11 +138,11 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) * * Implementations must not change the position, limit or mark of the given buffer * - * @param oldPostition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. + * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. * @param newBuffer new aggregation buffer. */ - default void relocate(int oldPostition, int newPosition, ByteBuffer newBuffer) + default void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer) { } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 350593996619..bd63b49f396e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -443,12 +443,16 @@ private void growIfPossible() throw new ISE("WTF?! Couldn't find a bucket while resizing?!"); } - int newPostition = newBucket * bucketSize; - newTableBuffer.position(newPostition); + int newPosition = newBucket * bucketSize; + newTableBuffer.position(newPosition); newTableBuffer.put(entryBuffer); for (int i = 0; i < aggregators.length; i++) { - aggregators[i].relocate(oldPosition + aggregatorOffsets[i], newPostition + aggregatorOffsets[i], newTableBuffer); + aggregators[i].relocate( + oldPosition + aggregatorOffsets[i], + newPosition + aggregatorOffsets[i], + newTableBuffer + ); } buffer.putInt(tableArenaSize + newSize * Ints.BYTES, newBucket * bucketSize); From ef92b17e9d73f9c528e8d1ecd9ba265989fe60a6 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Mon, 20 Mar 2017 18:03:20 -0700 Subject: [PATCH 5/9] batch relocate. --- .../theta/SketchBufferAggregator.java | 32 +++++++++++++------ .../oldapi/OldApiSketchAggregationTest.java | 22 +++++++++++++ .../query/aggregation/BufferAggregator.java | 11 +++++-- .../groupby/epinephelinae/BufferGrouper.java | 19 +++++++++-- .../aggregation/AggregationTestHelper.java | 6 ++-- 5 files changed, 72 insertions(+), 18 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 041fc82711aa..1181a4af5f16 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -33,13 +33,14 @@ import java.nio.ByteBuffer; import java.util.IdentityHashMap; +import java.util.Map; public class SketchBufferAggregator implements BufferAggregator { private final ObjectColumnSelector selector; private final int size; private final int maxIntermediateSize; - private final Int2ObjectMap unions = new Int2ObjectOpenHashMap<>(); + private final IdentityHashMap> unions = new IdentityHashMap<>(); private final IdentityHashMap nmCache = new IdentityHashMap<>(); public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize) @@ -53,8 +54,13 @@ public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIn public void init(ByteBuffer buf, int position) { NativeMemory nativeMemory = getNativeMemory(buf); + Int2ObjectMap unionMap = unions.get(buf); + if (unionMap == null) { + unionMap = new Int2ObjectOpenHashMap<>(); + } Memory mem = new MemoryRegion(nativeMemory, position, maxIntermediateSize); - unions.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION)); + unionMap.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION)); + unions.put(buf, unionMap); } @Override @@ -83,12 +89,14 @@ public Object get(ByteBuffer buf, int position) //Note that this is not threadsafe and I don't think it needs to be private Union getUnion(ByteBuffer buf, int position) { - Union union = unions.get(position); + Union union = unions.get(buf) != null ? unions.get(buf).get(position) : null; if (union == null) { NativeMemory nm = getNativeMemory(buf); Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); union = (Union) SetOperation.wrap(mem); - unions.put(position, union); + Int2ObjectMap unionMap = unions.get(buf) != null ? unions.get(buf) : new Int2ObjectOpenHashMap<>(); + unionMap.put(position, union); + unions.put(buf, unionMap); } return union; } @@ -118,16 +126,20 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) } @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer) + public void relocate(Map oldToNewPositionMap, ByteBuffer newBuffer, ByteBuffer oldBuffer) { + unions.clear(); NativeMemory nm = getNativeMemory(newBuffer); + Int2ObjectMap unionMap = new Int2ObjectOpenHashMap<>(); - Memory mem = new MemoryRegion(nm, newPosition, maxIntermediateSize); - Union newUnion = (Union) SetOperation.wrap(mem); - - unions.remove(oldPosition); - unions.put(newPosition, newUnion); + for (Map.Entry entry : oldToNewPositionMap.entrySet()) { + int newPosition = entry.getValue(); + Memory mem = new MemoryRegion(nm, newPosition, maxIntermediateSize); + Union newUnion = (Union) SetOperation.wrap(mem); + unionMap.put(newPosition, newUnion); + } + unions.put(newBuffer, unionMap); } private NativeMemory getNativeMemory(ByteBuffer buffer) diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java index 7821876e0298..39f2812ab5c3 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchAggregationTest.java @@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Files; +import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.UpdateSketch; import io.druid.data.input.MapBasedRow; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; @@ -29,9 +31,12 @@ import io.druid.query.aggregation.AggregationTestHelper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.aggregation.datasketches.theta.SketchHolder; import io.druid.query.aggregation.post.FieldAccessPostAggregator; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryRunnerTest; +import io.druid.query.groupby.epinephelinae.GrouperTestUtil; +import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; @@ -194,6 +199,23 @@ public void testSketchSetPostAggregatorSerde() throws Exception ); } + @Test + public void testRelocation() + { + final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + SketchHolder sketchHolder = SketchHolder.of(Sketches.updateSketchBuilder().build(16)); + UpdateSketch updateSketch = (UpdateSketch) sketchHolder.getSketch(); + updateSketch.update(1); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder))); + SketchHolder[] holders = helper.runRelocateVerificationTest( + new OldSketchMergeAggregatorFactory("sketch", "sketch", 16, false), + columnSelectorFactory, + SketchHolder.class + ); + Assert.assertEquals(holders[0].getEstimate(), holders[1].getEstimate(), 0); + } + private void assertPostAggregatorSerde(PostAggregator agg) throws Exception { Assert.assertEquals( diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index d72c85d74e17..4248d0ab0d65 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -24,6 +24,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import java.nio.ByteBuffer; +import java.util.Map; /** * A BufferAggregator is an object that can aggregate metrics into a ByteBuffer. Its aggregation-related methods @@ -136,13 +137,17 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) * * Only used if there is any positional caches/objects in the BufferAggregator implementation. * + * If relocate happens to be across multiple new ByteBuffers (say n ByteBuffers), this method should be called + * multiple times(n times) given all the new positions/old positions should exist in newBuffer/OldBuffer. + * * Implementations must not change the position, limit or mark of the given buffer * - * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. - * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. + * @param oldToNewPositionMap old position of a cached object to new Position of a cached object before and after + * aggregation buffer relocates to a new ByteBuffer. + * @param newBuffer new aggregation buffer. * @param newBuffer new aggregation buffer. */ - default void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer) + default void relocate(Map oldToNewPositionMap, ByteBuffer newBuffer, ByteBuffer oldBuffer) { } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index bd63b49f396e..742641f31008 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -30,10 +30,13 @@ import java.nio.ByteBuffer; import java.util.AbstractList; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; /** * Grouper based around a hash table and companion array in a single ByteBuffer. Not thread-safe. @@ -428,6 +431,12 @@ private void growIfPossible() final ByteBuffer entryBuffer = tableBuffer.duplicate(); final ByteBuffer keyBuffer = tableBuffer.duplicate(); + List> reloationMaps = new ArrayList<>(aggregators.length); + for (int i = 0; i < aggregators.length ; i++) + { + reloationMaps.add(new HashMap(buckets)); + } + for (int oldBucket = 0; oldBucket < buckets; oldBucket++) { if (isUsed(oldBucket)) { int oldPosition = oldBucket * bucketSize; @@ -448,10 +457,9 @@ private void growIfPossible() newTableBuffer.put(entryBuffer); for (int i = 0; i < aggregators.length; i++) { - aggregators[i].relocate( + reloationMaps.get(i).put( oldPosition + aggregatorOffsets[i], - newPosition + aggregatorOffsets[i], - newTableBuffer + newPosition + aggregatorOffsets[i] ); } @@ -460,6 +468,11 @@ private void growIfPossible() } } + for (int i = 0; i < aggregators.length ; i++) + { + aggregators[i].relocate(reloationMaps.get(i), newTableBuffer, tableBuffer); + } + buckets = newBuckets; maxSize = newMaxSize; tableBuffer = newTableBuffer; diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 9a725fd5551f..e1c0db96fa0c 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -88,6 +88,7 @@ import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -614,8 +615,9 @@ public T[] runRelocateVerificationTest( newBuf.position(7574); newBuf.put(theBytes); newBuf.position(0); - - agg.relocate(0, 7574, newBuf); + Map oldToNewPositions = new HashMap<>(); + oldToNewPositions.put(0, 7574); + agg.relocate(oldToNewPositions, newBuf, myBuf); results[1] = (T) agg.get(newBuf, 7574); return results; } From 078f3d143ef3132d09025d75c65a3114f97e9252 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Tue, 21 Mar 2017 09:29:22 -0700 Subject: [PATCH 6/9] Revert batch relocate. --- .../theta/SketchBufferAggregator.java | 45 +++++++------------ .../query/aggregation/BufferAggregator.java | 6 +-- .../groupby/epinephelinae/BufferGrouper.java | 20 ++------- .../aggregation/AggregationTestHelper.java | 4 +- 4 files changed, 25 insertions(+), 50 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 1181a4af5f16..f9ecf6c79b7b 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -33,7 +33,6 @@ import java.nio.ByteBuffer; import java.util.IdentityHashMap; -import java.util.Map; public class SketchBufferAggregator implements BufferAggregator { @@ -53,14 +52,7 @@ public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIn @Override public void init(ByteBuffer buf, int position) { - NativeMemory nativeMemory = getNativeMemory(buf); - Int2ObjectMap unionMap = unions.get(buf); - if (unionMap == null) { - unionMap = new Int2ObjectOpenHashMap<>(); - } - Memory mem = new MemoryRegion(nativeMemory, position, maxIntermediateSize); - unionMap.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION)); - unions.put(buf, unionMap); + createNewUnion(buf, position, false); } @Override @@ -91,16 +83,24 @@ private Union getUnion(ByteBuffer buf, int position) { Union union = unions.get(buf) != null ? unions.get(buf).get(position) : null; if (union == null) { - NativeMemory nm = getNativeMemory(buf); - Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); - union = (Union) SetOperation.wrap(mem); - Int2ObjectMap unionMap = unions.get(buf) != null ? unions.get(buf) : new Int2ObjectOpenHashMap<>(); - unionMap.put(position, union); - unions.put(buf, unionMap); + union = createNewUnion(buf, position, true); } return union; } + private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) + { + NativeMemory nm = getNativeMemory(buf); + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + Union union = isWrapped + ? (Union) SetOperation.wrap(mem) + : (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION); + Int2ObjectMap unionMap = unions.get(buf) != null ? unions.get(buf) : new Int2ObjectOpenHashMap<>(); + unionMap.put(position, union); + unions.put(buf, unionMap); + return union; + } + @Override public float getFloat(ByteBuffer buf, int position) { @@ -126,20 +126,9 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) } @Override - public void relocate(Map oldToNewPositionMap, ByteBuffer newBuffer, ByteBuffer oldBuffer) + public void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer, ByteBuffer oldBuffer) { - unions.clear(); - NativeMemory nm = getNativeMemory(newBuffer); - Int2ObjectMap unionMap = new Int2ObjectOpenHashMap<>(); - - for (Map.Entry entry : oldToNewPositionMap.entrySet()) { - int newPosition = entry.getValue(); - Memory mem = new MemoryRegion(nm, newPosition, maxIntermediateSize); - Union newUnion = (Union) SetOperation.wrap(mem); - unionMap.put(newPosition, newUnion); - } - - unions.put(newBuffer, unionMap); + createNewUnion(newBuffer, newPosition, true); } private NativeMemory getNativeMemory(ByteBuffer buffer) diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index 4248d0ab0d65..f2a788ebd03f 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -142,12 +142,12 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) * * Implementations must not change the position, limit or mark of the given buffer * - * @param oldToNewPositionMap old position of a cached object to new Position of a cached object before and after - * aggregation buffer relocates to a new ByteBuffer. + * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. + * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. * @param newBuffer new aggregation buffer. * @param newBuffer new aggregation buffer. */ - default void relocate(Map oldToNewPositionMap, ByteBuffer newBuffer, ByteBuffer oldBuffer) + default void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer, ByteBuffer oldBuffer) { } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 742641f31008..4806a45b96dc 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -30,13 +30,10 @@ import java.nio.ByteBuffer; import java.util.AbstractList; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; /** * Grouper based around a hash table and companion array in a single ByteBuffer. Not thread-safe. @@ -431,12 +428,6 @@ private void growIfPossible() final ByteBuffer entryBuffer = tableBuffer.duplicate(); final ByteBuffer keyBuffer = tableBuffer.duplicate(); - List> reloationMaps = new ArrayList<>(aggregators.length); - for (int i = 0; i < aggregators.length ; i++) - { - reloationMaps.add(new HashMap(buckets)); - } - for (int oldBucket = 0; oldBucket < buckets; oldBucket++) { if (isUsed(oldBucket)) { int oldPosition = oldBucket * bucketSize; @@ -457,9 +448,11 @@ private void growIfPossible() newTableBuffer.put(entryBuffer); for (int i = 0; i < aggregators.length; i++) { - reloationMaps.get(i).put( + aggregators[i].relocate( oldPosition + aggregatorOffsets[i], - newPosition + aggregatorOffsets[i] + newPosition + aggregatorOffsets[i], + newTableBuffer, + tableBuffer ); } @@ -468,11 +461,6 @@ private void growIfPossible() } } - for (int i = 0; i < aggregators.length ; i++) - { - aggregators[i].relocate(reloationMaps.get(i), newTableBuffer, tableBuffer); - } - buckets = newBuckets; maxSize = newMaxSize; tableBuffer = newTableBuffer; diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index e1c0db96fa0c..a3a36bdd71fd 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -615,9 +615,7 @@ public T[] runRelocateVerificationTest( newBuf.position(7574); newBuf.put(theBytes); newBuf.position(0); - Map oldToNewPositions = new HashMap<>(); - oldToNewPositions.put(0, 7574); - agg.relocate(oldToNewPositions, newBuf, myBuf); + agg.relocate(0, 7574, newBuf, myBuf); results[1] = (T) agg.get(newBuf, 7574); return results; } From fd1a77fb53a9bbcc93223703927fb0d8bc38846f Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Tue, 21 Mar 2017 09:40:03 -0700 Subject: [PATCH 7/9] Unused import. --- .../java/io/druid/query/aggregation/AggregationTestHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index a3a36bdd71fd..f98b5364a2f7 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -88,7 +88,6 @@ import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; From a6b4f707d0bdfbf879bc7c501502518f9bda12a4 Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Tue, 21 Mar 2017 12:08:08 -0700 Subject: [PATCH 8/9] code comments. --- .../theta/SketchBufferAggregator.java | 24 ++++++++++++++----- .../query/aggregation/BufferAggregator.java | 9 ++++--- .../groupby/epinephelinae/BufferGrouper.java | 4 ++-- .../aggregation/AggregationTestHelper.java | 2 +- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index f9ecf6c79b7b..d4cbc175b198 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -81,11 +81,12 @@ public Object get(ByteBuffer buf, int position) //Note that this is not threadsafe and I don't think it needs to be private Union getUnion(ByteBuffer buf, int position) { - Union union = unions.get(buf) != null ? unions.get(buf).get(position) : null; - if (union == null) { - union = createNewUnion(buf, position, true); + Int2ObjectMap unionMap = unions.get(buf); + Union union = unionMap != null ? unionMap.get(position) : null; + if (union != null) { + return union; } - return union; + return createNewUnion(buf, position, true); } private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) @@ -95,7 +96,10 @@ private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) Union union = isWrapped ? (Union) SetOperation.wrap(mem) : (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION); - Int2ObjectMap unionMap = unions.get(buf) != null ? unions.get(buf) : new Int2ObjectOpenHashMap<>(); + Int2ObjectMap unionMap = unions.get(buf); + if (unionMap == null) { + unionMap = new Int2ObjectOpenHashMap<>(); + } unionMap.put(position, union); unions.put(buf, unionMap); return union; @@ -126,9 +130,17 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) } @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer, ByteBuffer oldBuffer) + public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { createNewUnion(newBuffer, newPosition, true); + Int2ObjectMap unionMap = unions.get(oldBuffer); + if (unionMap != null) { + unionMap.remove(oldPosition); + if (unionMap.isEmpty()) { + unions.remove(oldBuffer); + nmCache.remove(oldBuffer); + } + } } private NativeMemory getNativeMemory(ByteBuffer buffer) diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index f2a788ebd03f..1951a6df4ae8 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -24,7 +24,6 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import java.nio.ByteBuffer; -import java.util.Map; /** * A BufferAggregator is an object that can aggregate metrics into a ByteBuffer. Its aggregation-related methods @@ -143,11 +142,11 @@ default void inspectRuntimeShape(RuntimeShapeInspector inspector) * Implementations must not change the position, limit or mark of the given buffer * * @param oldPosition old position of a cached object before aggregation buffer relocates to a new ByteBuffer. - * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. - * @param newBuffer new aggregation buffer. - * @param newBuffer new aggregation buffer. + * @param newPosition new position of a cached object after aggregation buffer relocates to a new ByteBuffer. + * @param oldBuffer old aggregation buffer. + * @param newBuffer new aggregation buffer. */ - default void relocate(int oldPosition, int newPosition, ByteBuffer newBuffer, ByteBuffer oldBuffer) + default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) { } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 4806a45b96dc..bef3ba226826 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -451,8 +451,8 @@ private void growIfPossible() aggregators[i].relocate( oldPosition + aggregatorOffsets[i], newPosition + aggregatorOffsets[i], - newTableBuffer, - tableBuffer + tableBuffer, + newTableBuffer ); } diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index f98b5364a2f7..df73615e9911 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -614,7 +614,7 @@ public T[] runRelocateVerificationTest( newBuf.position(7574); newBuf.put(theBytes); newBuf.position(0); - agg.relocate(0, 7574, newBuf, myBuf); + agg.relocate(0, 7574, myBuf, newBuf); results[1] = (T) agg.get(newBuf, 7574); return results; } From e9a31935fd5477ce09d894648d3879ce54e30e2a Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Tue, 21 Mar 2017 13:55:42 -0700 Subject: [PATCH 9/9] code comment. --- .../aggregation/datasketches/theta/SketchBufferAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index d4cbc175b198..4d02bea6fa60 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -99,9 +99,9 @@ private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) Int2ObjectMap unionMap = unions.get(buf); if (unionMap == null) { unionMap = new Int2ObjectOpenHashMap<>(); + unions.put(buf, unionMap); } unionMap.put(position, union); - unions.put(buf, unionMap); return union; }