Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ protected abstract AggregatorType[] initAggs(
boolean deserializeComplexMetrics
);

// Note: This method needs to be thread safe.
protected abstract Integer addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;

import java.util.Arrays;
import java.util.Map;
Expand All @@ -45,6 +51,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;

private String outOfRowsReason = null;

Expand Down Expand Up @@ -118,6 +125,14 @@ protected Aggregator[] initAggs(
AggregatorFactory[] metrics, Supplier<InputRow> rowSupplier, boolean deserializeComplexMetrics
)
{
selectors = Maps.newHashMap();
for (AggregatorFactory agg : metrics) {
selectors.put(
agg.getName(),
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
);
}

return new Aggregator[metrics.length];
}

Expand All @@ -144,7 +159,7 @@ protected Integer addToFacts(
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
selectors.get(agg.getName())
);
}
final Integer rowIndex = indexIncrement.getAndIncrement();
Expand Down Expand Up @@ -253,6 +268,9 @@ public void close()
super.close();
aggregators.clear();
facts.clear();
if (selectors != null) {
selectors.clear();
}
}

private static class OnHeapDimDim implements DimDim
Expand Down Expand Up @@ -345,4 +363,75 @@ public boolean compareCanonicalValues(String s1, String s2)
return s1 == s2;
}
}

// Caches references to selector objetcs for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, ObjectColumnSelector> objectColumnSelectorMap = Maps.newConcurrentMap();
private final ColumnSelectorFactory delegate;

public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate)
{
this.delegate = delegate;
}

@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return delegate.makeDimensionSelector(dimensionSpec);
}

@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName);
FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}

@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
LongColumnSelector existing = longColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName);
LongColumnSelector prev = longColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}

@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName);
ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
}

}