Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,22 @@
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.util.Collections;
Expand All @@ -63,7 +74,8 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
public DoublesSketchAggregatorFactory(
@JsonProperty("name") final String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("k") final Integer k)
@JsonProperty("k") final Integer k
)
{
this(name, fieldName, k, AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
}
Expand Down Expand Up @@ -106,7 +118,7 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFact
{
if (metricFactory.getColumnCapabilities(fieldName) != null
&& ValueType.isNumeric(metricFactory.getColumnCapabilities(fieldName).getType())) {
final ColumnValueSelector<Double> selector = metricFactory.makeColumnValueSelector(fieldName);
final BaseDoubleColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
if (selector instanceof NilColumnValueSelector) {
return new NoopDoublesSketchBufferAggregator();
}
Expand All @@ -119,6 +131,65 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFact
return new DoublesSketchMergeBufferAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
return ColumnProcessors.makeVectorProcessor(
fieldName,
new VectorColumnProcessorFactory<VectorAggregator>()
{
@Override
public VectorAggregator makeSingleValueDimensionProcessor(
ColumnCapabilities capabilities,
SingleValueDimensionVectorSelector selector
)
{
return new NoopDoublesSketchBufferAggregator();
}

@Override
public VectorAggregator makeMultiValueDimensionProcessor(
ColumnCapabilities capabilities,
MultiValueDimensionVectorSelector selector
)
{
return new NoopDoublesSketchBufferAggregator();
}

@Override
public VectorAggregator makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new DoublesSketchMergeVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}
},
selectorFactory
);
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}

@Override
public Object deserialize(final Object object)
{
Expand Down Expand Up @@ -217,8 +288,9 @@ public List<AggregatorFactory> getRequiredColumns()
new DoublesSketchAggregatorFactory(
fieldName,
fieldName,
k)
);
k
)
);
}

@Override
Expand Down Expand Up @@ -306,10 +378,10 @@ public int hashCode()
public String toString()
{
return getClass().getSimpleName() + "{"
+ "name=" + name
+ ", fieldName=" + fieldName
+ ", k=" + k
+ "}";
+ "name=" + name
+ ", fieldName=" + fieldName
+ ", k=" + k
+ "}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,52 @@

package org.apache.druid.query.aggregation.datasketches.quantiles;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;

public class DoublesSketchBuildBufferAggregator implements BufferAggregator
{

private final ColumnValueSelector<Double> selector;
private final int size;
private final int maxIntermediateSize;
private final BaseDoubleColumnValueSelector selector;
private final DoublesSketchBuildBufferAggregatorHelper helper;

private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();

public DoublesSketchBuildBufferAggregator(final ColumnValueSelector<Double> valueSelector, final int size,
final int maxIntermediateSize)
public DoublesSketchBuildBufferAggregator(
final BaseDoubleColumnValueSelector valueSelector,
final int size,
final int maxIntermediateSize
)
{
this.selector = valueSelector;
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
this.helper = new DoublesSketchBuildBufferAggregatorHelper(size, maxIntermediateSize);
}

@Override
public synchronized void init(final ByteBuffer buffer, final int position)
public void init(ByteBuffer buf, int position)
{
final WritableMemory mem = getMemory(buffer);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
putSketch(buffer, position, sketch);
helper.init(buf, position);
}

@Override
public synchronized void aggregate(final ByteBuffer buffer, final int position)
public void aggregate(final ByteBuffer buffer, final int position)
{
if (selector.isNull()) {
return;
}
final UpdateDoublesSketch sketch = sketches.get(buffer).get(position);

final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position);
sketch.update(selector.getDouble());
}

@Nullable
@Override
public synchronized Object get(final ByteBuffer buffer, final int position)
public Object get(ByteBuffer buf, int position)
{
return sketches.get(buffer).get(position).compact();
return helper.get(buf, position);
}

@Override
Expand All @@ -88,48 +80,22 @@ public long getLong(final ByteBuffer buffer, final int position)
}

@Override
public synchronized void close()
public void close()
{
sketches.clear();
memCache.clear();
helper.clear();
}

// A small number of sketches may run out of the given memory, request more memory on heap and move there.
// In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
@Override
public synchronized void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
sketch = UpdateDoublesSketch.wrap(newRegion);
}
putSketch(newBuffer, newPosition, sketch);

final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
sketches.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}

private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}

private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
helper.relocate(oldPosition, newPosition, oldBuffer, newBuffer);
}

@Override
public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.datasketches.quantiles;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.CompactDoublesSketch;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.IdentityHashMap;

public class DoublesSketchBuildBufferAggregatorHelper
{
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<UpdateDoublesSketch>> sketches = new IdentityHashMap<>();

public DoublesSketchBuildBufferAggregatorHelper(final int size, final int maxIntermediateSize)
{
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
}

public void init(final ByteBuffer buffer, final int position)
{
final WritableMemory mem = getMemory(buffer);
final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
final UpdateDoublesSketch sketch = DoublesSketch.builder().setK(size).build(region);
putSketch(buffer, position, sketch);
}

public CompactDoublesSketch get(final ByteBuffer buffer, final int position)
{
return sketches.get(buffer).get(position).compact();
}

// A small number of sketches may run out of the given memory, request more memory on heap and move there.
// In that case we need to reuse the object from the cache as opposed to wrapping the new buffer.
public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer)
{
UpdateDoublesSketch sketch = sketches.get(oldBuffer).get(oldPosition);
final WritableMemory oldRegion = getMemory(oldBuffer).writableRegion(oldPosition, maxIntermediateSize);
if (sketch.isSameResource(oldRegion)) { // sketch was not relocated on heap
final WritableMemory newRegion = getMemory(newBuffer).writableRegion(newPosition, maxIntermediateSize);
sketch = UpdateDoublesSketch.wrap(newRegion);
}
putSketch(newBuffer, newPosition, sketch);

final Int2ObjectMap<UpdateDoublesSketch> map = sketches.get(oldBuffer);
map.remove(oldPosition);
if (map.isEmpty()) {
sketches.remove(oldBuffer);
memCache.remove(oldBuffer);
}
}

public void clear()
{
sketches.clear();
memCache.clear();
}

/**
* Retrieves the sketch at a particular position.
*/
public UpdateDoublesSketch getSketchAtPosition(final ByteBuffer buf, final int position)
{
return sketches.get(buf).get(position);
}

private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN));
}

private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
{
Int2ObjectMap<UpdateDoublesSketch> map = sketches.computeIfAbsent(buffer, buf -> new Int2ObjectOpenHashMap<>());
map.put(position, sketch);
}
}
Loading