Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/druid/memory/BufferHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.druid.memory;

import java.nio.ByteBuffer;

public interface BufferHolder
{
int position();
int capacity();
ByteBuffer get();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.druid.memory;

import java.nio.ByteBuffer;

public interface MemoryAllocator
{
BufferHolder allocate(int capacity);
void free(BufferHolder bh);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.apache.druid.memory;

import com.google.common.base.Suppliers;

import java.nio.ByteBuffer;
import java.util.function.Supplier;

public class SimpleOnHeapMemoryAllocator implements MemoryAllocator
{
@Override
public BufferHolder allocate(int capacity)
{
return new SimplerBufferHolder(ByteBuffer.allocate(capacity));
}

@Override
public void free(BufferHolder ignored)
{

}

private static class SimplerBufferHolder implements BufferHolder
{
private final ByteBuffer bb;

public SimplerBufferHolder(ByteBuffer bb)
{
this.bb = bb;
}

@Override
public int position()
{
return 0;
}

@Override
public int capacity()
{
return bb.capacity();
}

@Override
public ByteBuffer get()
{
return bb;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -212,6 +213,26 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre
*/
public abstract int getMaxIntermediateSize();

/**
* Does BufferAggregator support handling of varying ByteBuffer sizes by overriding
* {@link BufferAggregator#aggregate(ByteBuffer, int, int)}
* @return
*/
public boolean isDynamicallyResizable()
{
return getMinIntermediateSize() < getMaxIntermediateSize();
}

/**
* Start size of ByteBuffer to be used with BufferAggregator.
* @return
*/
public int getMinIntermediateSize()
{
return getMaxIntermediateSize();
}


/**
* Returns the maximum size that this aggregator will require in bytes for intermediate storage of results.
* Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.druid.query.aggregation;

import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.memory.BufferHolder;
import org.apache.druid.memory.MemoryAllocator;
import org.apache.druid.query.monomorphicprocessing.CalledFromHotLoop;
import org.apache.druid.query.monomorphicprocessing.HotLoopCallee;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand Down Expand Up @@ -209,4 +211,52 @@ default boolean isNull(ByteBuffer buf, int position)
return false;
}

/**
* Returns false if aggregation requires a bigger buffer than capacity arg or true.
* Return status must be used exclusively to signal "low memory in buffer" condition and
* nothing else.
*/
default boolean aggregate(ByteBuffer buff, int position, int capacity)
{
aggregate(buff, position);
return true;
}

// Following methods are equivalent of old methods with same name except they provide access to capacity
// of ByteBuffer which is assumed to be AggregatorFactory.getMaxIntermediateSize() by older methods.

default void init(ByteBuffer buff, int position, int capacity)
{
init(buff, position);
}

default void relocate(ByteBuffer oldBuff, int oldPosition, int oldCapacity, ByteBuffer newwBuff, int newwPosition, int newwCapacity)
{
relocate(oldPosition, newwPosition, oldBuff, newwBuff);
}

default Object get(ByteBuffer buff, int position, int capacity)
{
return get(buff, position);
}

default float getFloat(ByteBuffer buff, int position, int capacity)
{
return getFloat(buff, position);
}

default double getDouble(ByteBuffer buff, int position, int capacity)
{
return getDouble(buff, position);
}

default long getLong(ByteBuffer buff, int position, int capacity)
{
return getLong(buff, position);
}

default boolean isNull(ByteBuffer buff, int position, int capacity)
{
return isNull(buff, position);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,6 @@ protected abstract AddToFactsResult addToFacts(

public abstract int getLastRowIndex();

protected abstract AggregatorType[] getAggsForRow(int rowOffset);

protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition);

protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);

protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
Expand Down Expand Up @@ -1029,9 +1025,8 @@ public Iterator<Row> iterator()
theVals.put(dimensionName, rowVals);
}

AggregatorType[] aggs = getAggsForRow(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
for (int i = 0; i < metrics.length; ++i) {
theVals.put(metrics[i].getName(), getMetricObjectValue(rowOffset, i));
}

if (postAggs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,6 @@ public String getOutOfRowsReason()
return outOfRowsReason;
}

@Override
protected BufferAggregator[] getAggsForRow(int rowOffset)
{
return getAggs();
}

@Override
protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition)
{
int[] indexAndOffset = indexAndOffsets.get(rowOffset);
ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get();
return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]);
}

@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
Expand Down
Loading