Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>, Closeable
public abstract class IncrementalIndex<T> implements Iterable<Row>, Closeable
{
private volatile DateTime maxIngestedEventTime;

Expand Down Expand Up @@ -337,7 +339,6 @@ public int lookupId(String name)
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final Metadata metadata;
Expand All @@ -361,6 +362,12 @@ public InputRow get()
}
};

protected final ConcurrentNavigableMap<TimeAndDims, T> facts;

protected final int maxRowCount;
protected final Map<String, ColumnSelectorFactory> selectors;
protected String outOfRowsReason;

/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
Expand All @@ -374,7 +381,8 @@ public InputRow get()
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions
final boolean reportParseExceptions,
final int maxRowCount
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
Expand All @@ -384,10 +392,8 @@ public IncrementalIndex(
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.reportParseExceptions = reportParseExceptions;

this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));

this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
this.columnCapabilities = Maps.newHashMap();
this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));

this.metricDescs = Maps.newLinkedHashMap();
for (AggregatorFactory metric : metrics) {
Expand Down Expand Up @@ -418,6 +424,18 @@ public IncrementalIndex(
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
this.maxRowCount = maxRowCount;

selectors = Maps.newHashMap();
for (AggregatorFactory agg : metrics) {
selectors.put(
agg.getName(),
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
);
}

initAggs(metrics, rowSupplier, deserializeComplexMetrics);
}

private DimDim newDimDim(String dimension, ValueType type) {
Expand All @@ -441,17 +459,33 @@ private DimDim newDimDim(String dimension, ValueType type) {
// use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation
protected abstract DimDim makeDimDim(String dimension, Object lock);

public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public ConcurrentNavigableMap<TimeAndDims, T> getFacts()
{
return facts;
}

public abstract boolean canAppendRow();
public boolean canAppendRow()
{
final boolean canAdd = size() < maxRowCount;
if (!canAdd) {
outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount);
}
return canAdd;
}

public abstract String getOutOfRowsReason();
public String getOutOfRowsReason()
{
return outOfRowsReason;
}

protected abstract AggregatorType[] initAggs(
// called in constructor
protected void initAggs(
AggregatorFactory[] metrics,
Supplier<InputRow> rowSupplier,
boolean deserializeComplexMetrics
);
)
{
}

// Note: This method needs to be thread safe.
protected abstract Integer addToFacts(
Expand All @@ -465,20 +499,23 @@ protected abstract Integer addToFacts(
Supplier<InputRow> rowSupplier
) throws IndexSizeExceededException;

protected abstract AggregatorType[] getAggsForRow(int rowOffset);

protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition);
protected abstract float getMetricFloatValue(T agg, int aggOffset);

protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);
protected abstract long getMetricLongValue(T agg, int aggOffset);

protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
protected abstract Object getMetricObjectValue(T agg, int aggOffset);

protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset);
protected abstract void destroy(T aggregators);

@Override
public void close()
{
for (T agg : facts.values()) {
destroy(agg);
}
facts.clear();
dimValues.clear();
selectors.clear();
}

public InputRow formatRow(InputRow row)
Expand Down Expand Up @@ -538,16 +575,17 @@ public Map<String, DimensionDesc> getDimensionDescs()
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
* <p>
* <p>
* <p/>
* <p/>
* Calls to add() are thread safe.
* <p>
* <p/>
*
* @param row the row of data to add
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row) throws IndexSizeExceededException {
public int add(InputRow row) throws IndexSizeExceededException
{
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(
metrics,
Expand Down Expand Up @@ -708,11 +746,6 @@ private int[] getDimVals(final DimDim dimLookup, final List<Comparable> dimValue
return retVal;
}

public AggregatorType[] getAggs()
{
return aggs;
}

public AggregatorFactory[] getMetricAggs()
{
return metrics;
Expand Down Expand Up @@ -829,7 +862,7 @@ public ColumnCapabilities getCapabilities(String column)
return columnCapabilities.get(column);
}

public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
public ConcurrentNavigableMap<TimeAndDims, T> getSubMap(TimeAndDims start, TimeAndDims end)
{
return getFacts().subMap(start, end);
}
Expand Down Expand Up @@ -862,16 +895,18 @@ public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> pos
public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();
final ConcurrentNavigableMap<TimeAndDims, T> facts = descending
? getFacts().descendingMap()
: getFacts();
return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
new Function<Map.Entry<TimeAndDims, T>, Row>()
{
@Override
public Row apply(final Map.Entry<TimeAndDims, Integer> input)
public Row apply(final Map.Entry<TimeAndDims, T> input)
{
final TimeAndDims timeAndDims = input.getKey();
final int rowOffset = input.getValue();
final T aggs = input.getValue();

int[][] theDims = timeAndDims.getDims(); //TODO: remove dictionary encoding for numerics later
ValueType[] types = timeAndDims.getTypes();
Expand Down Expand Up @@ -908,9 +943,8 @@ public Row apply(final Map.Entry<TimeAndDims, Integer> input)
}
}

AggregatorType[] aggs = getAggsForRow(rowOffset);
for (int i = 0; i < aggs.length; ++i) {
theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i));
for (int i = 0; i < metrics.length; ++i) {
theVals.put(metrics[i].getName(), getMetricObjectValue(aggs, i));
}

if (postAggs != null) {
Expand Down Expand Up @@ -1273,4 +1307,74 @@ public int compare(TimeAndDims lhs, TimeAndDims rhs)
return retVal;
}
}

// Caches references to selector objects for each column instead of creating a new object each time in order to save heap space.
// In general the selectorFactory need not to thread-safe.
// here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex.
static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory
{
private final ConcurrentMap<String, LongColumnSelector> longColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, FloatColumnSelector> floatColumnSelectorMap = Maps.newConcurrentMap();
private final ConcurrentMap<String, ObjectColumnSelector> objectColumnSelectorMap = Maps.newConcurrentMap();
private final ColumnSelectorFactory delegate;

public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate)
{
this.delegate = delegate;
}

@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return delegate.makeDimensionSelector(dimensionSpec);
}

@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName);
FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}

@Override
public LongColumnSelector makeLongColumnSelector(String columnName)
{
LongColumnSelector existing = longColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName);
LongColumnSelector prev = longColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}

@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName);
ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent(
columnName,
newSelector
);
return prev != null ? prev : newSelector;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{
private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval;
private final IncrementalIndex<?> index;
private final IncrementalIndex index;
private final Set<String> hasNullValueDimensions;

private final Map<String, DimensionIndexer> indexers;
Expand Down Expand Up @@ -239,16 +239,18 @@ public Iterator<Rowboat> iterator()
*/
return Iterators.transform(
index.getFacts().entrySet().iterator(),
new Function<Map.Entry<IncrementalIndex.TimeAndDims, Integer>, Rowboat>()
new Function<Map.Entry<IncrementalIndex.TimeAndDims, Object>, Rowboat>()
{
int count = 0;

@Override
public Rowboat apply(Map.Entry<IncrementalIndex.TimeAndDims, Integer> input)
public Rowboat apply(
Map.Entry<IncrementalIndex.TimeAndDims, Object> input
)
{
final IncrementalIndex.TimeAndDims timeAndDims = input.getKey();
final int[][] dimValues = timeAndDims.getDims();
final int rowOffset = input.getValue();
final Object aggrs = input.getValue();

int[][] dims = new int[dimValues.length][];
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
Expand All @@ -273,7 +275,7 @@ public Rowboat apply(Map.Entry<IncrementalIndex.TimeAndDims, Integer> input)

Object[] metrics = new Object[index.getMetricAggs().length];
for (int i = 0; i < metrics.length; i++) {
metrics[i] = index.getMetricObjectValue(rowOffset, i);
metrics[i] = index.getMetricObjectValue(aggrs, i);
}

return new Rowboat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ public Cursor apply(@Nullable final Long input)

return new Cursor()
{
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter;
private ConcurrentNavigableMap<IncrementalIndex.TimeAndDims, Integer> cursorMap;
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Object>> baseIter;
private ConcurrentNavigableMap<IncrementalIndex.TimeAndDims, Object> cursorMap;
final DateTime time;
int numAdvanced = -1;
boolean done;
Expand Down Expand Up @@ -613,14 +613,14 @@ private ValueMatcher makeFilterMatcher(final Filter filter, final EntryHolder ho

private static class EntryHolder
{
Map.Entry<IncrementalIndex.TimeAndDims, Integer> currEntry = null;
Map.Entry<IncrementalIndex.TimeAndDims, Object> currEntry = null;

public Map.Entry<IncrementalIndex.TimeAndDims, Integer> get()
public Map.Entry<IncrementalIndex.TimeAndDims, Object> get()
{
return currEntry;
}

public void set(Map.Entry<IncrementalIndex.TimeAndDims, Integer> currEntry)
public void set(Map.Entry<IncrementalIndex.TimeAndDims, Object> currEntry)
{
this.currEntry = currEntry;
}
Expand All @@ -630,7 +630,7 @@ public IncrementalIndex.TimeAndDims getKey()
return currEntry.getKey();
}

public Integer getValue()
public Object getValue()
{
return currEntry.getValue();
}
Expand Down
Loading