Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions extensions-core/datasketches/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
Expand All @@ -178,6 +183,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-hll</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -77,6 +80,18 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
return new SketchVectorAggregator(selectorFactory, fieldName, size, getMaxIntermediateSizeWithNulls());
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}

@Override
public Object deserialize(Object object)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,29 @@

package org.apache.druid.query.aggregation.datasketches.theta;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.Family;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Union;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;

public class SketchBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector selector;
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final SketchBufferAggregatorHelper helper;

public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize)
{
this.selector = selector;
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
this.helper = new SketchBufferAggregatorHelper(size, maxIntermediateSize);
}

@Override
public void init(ByteBuffer buf, int position)
{
createNewUnion(buf, position, false);
helper.init(buf, position);
}

@Override
Expand All @@ -62,49 +52,16 @@ public void aggregate(ByteBuffer buf, int position)
return;
}

Union union = getOrCreateUnion(buf, position);
Union union = helper.getOrCreateUnion(buf, position);
SketchAggregator.updateUnion(union, update);
}


@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
Int2ObjectMap<Union> unionMap = unions.get(buf);
Union union = unionMap != null ? unionMap.get(position) : null;
if (union == null) {
return SketchHolder.EMPTY;
}
//in the code below, I am returning SetOp.getResult(true, null)
//"true" returns an ordered sketch but slower to compute than unordered sketch.
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return SketchHolder.of(union.getResult(true, null));
}

private Union getOrCreateUnion(ByteBuffer buf, int position)
{
Int2ObjectMap<Union> unionMap = unions.get(buf);
Union union = unionMap != null ? unionMap.get(position) : null;
if (union != null) {
return union;
}
return createNewUnion(buf, position, true);
}

private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
{
WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
Union union = isWrapped
? (Union) SetOperation.wrap(mem)
: (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
Int2ObjectMap<Union> unionMap = unions.get(buf);
if (unionMap == null) {
unionMap = new Int2ObjectOpenHashMap<>();
unions.put(buf, unionMap);
}
unionMap.put(position, union);
return union;
return helper.get(buf, position);
}

@Override
Expand All @@ -128,8 +85,7 @@ public double getDouble(ByteBuffer buf, int position)
@Override
public void close()
{
unions.clear();
memCache.clear();
helper.close();
}

@Override
Expand All @@ -141,25 +97,6 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
@Override
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
createNewUnion(newBuffer, newPosition, true);
Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
if (unionMap != null) {
unionMap.remove(oldPosition);
if (unionMap.isEmpty()) {
unions.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}
}

private WritableMemory getMemory(ByteBuffer buffer)
{
WritableMemory mem = memCache.get(buffer);
if (mem == null) {
mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN);
memCache.put(buffer, mem);
}
return mem;
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.datasketches.theta;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.Family;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Union;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;

/**
* A helper class used by {@link SketchBufferAggregator} and {@link SketchVectorAggregator}
* for aggregation operations on byte buffers. Getting the object from value selectors is outside this class.
*/
final class SketchBufferAggregatorHelper
{
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();

public SketchBufferAggregatorHelper(final int size, final int maxIntermediateSize)
{
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
}

/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#init} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#init}.
*/
public void init(ByteBuffer buf, int position)
{
createNewUnion(buf, position, false);
}

/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#get} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#get}.
*/
public Object get(ByteBuffer buf, int position)
{
Int2ObjectMap<Union> unionMap = unions.get(buf);
Union union = unionMap != null ? unionMap.get(position) : null;
if (union == null) {
return SketchHolder.EMPTY;
}
//in the code below, I am returning SetOp.getResult(true, null)
//"true" returns an ordered sketch but slower to compute than unordered sketch.
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return SketchHolder.of(union.getResult(true, null));
}

/**
* Helper for implementing {@link org.apache.druid.query.aggregation.BufferAggregator#relocate} and
* {@link org.apache.druid.query.aggregation.VectorAggregator#relocate}.
*/
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
createNewUnion(newBuffer, newPosition, true);
Int2ObjectMap<Union> unionMap = unions.get(oldBuffer);
if (unionMap != null) {
unionMap.remove(oldPosition);
if (unionMap.isEmpty()) {
unions.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}
}

/**
* Returns a {@link Union} associated with a particular buffer location.
*
* The Union object will be cached in this helper until {@link #close()} is called.
*/
public Union getOrCreateUnion(ByteBuffer buf, int position)
{
Int2ObjectMap<Union> unionMap = unions.get(buf);
Union union = unionMap != null ? unionMap.get(position) : null;
if (union != null) {
return union;
}
return createNewUnion(buf, position, true);
}

private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
{
WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
Union union = isWrapped
? (Union) SetOperation.wrap(mem)
: (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
Int2ObjectMap<Union> unionMap = unions.get(buf);
if (unionMap == null) {
unionMap = new Int2ObjectOpenHashMap<>();
unions.put(buf, unionMap);
}
unionMap.put(position, union);
return union;
}

public void close()
{
unions.clear();
memCache.clear();
}

private WritableMemory getMemory(ByteBuffer buffer)
{
WritableMemory mem = memCache.get(buffer);
if (mem == null) {
mem = WritableMemory.wrap(buffer, ByteOrder.LITTLE_ENDIAN);
memCache.put(buffer, mem);
}
return mem;
}
}
Loading