Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private IncrementalIndex makeIncIndex()
aggs,
false,
false,
true,
maxRows
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class GroupByQueryHelper
{
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
public final static String CTX_KEY_SORT_RESULTS = "sortResults";

public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
Expand Down Expand Up @@ -81,7 +82,9 @@ public String apply(DimensionSpec input)
);
final IncrementalIndex index;

if (query.getContextBoolean("useOffheap", false)) {
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);

if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
Expand All @@ -90,6 +93,7 @@ public String apply(DimensionSpec input)
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
sortResults,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()),
bufferPool
);
Expand All @@ -102,6 +106,7 @@ public String apply(DimensionSpec input)
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
sortResults,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,18 @@ private Sequence<Row> mergeGroupByResults(
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}

final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
final Sequence<Row> subqueryResult = mergeGroupByResults(
subquery.withOverriddenContext(
ImmutableMap.<String, Object>of(
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user.
GroupByQueryHelper.CTX_KEY_SORT_RESULTS,
false
)
),
runner,
context
);
final Set<AggregatorFactory> aggs = Sets.newHashSet();

// Nested group-bys work by first running the inner query and then materializing the results in an incremental
Expand Down Expand Up @@ -200,7 +211,14 @@ public boolean apply(AggregatorFactory agg)
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
.build();

final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(innerQuery, subqueryResult);
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(
innerQuery.withOverriddenContext(
ImmutableMap.<String, Object>of(
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
)
),
subqueryResult
);

//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
//is ensured by QuerySegmentSpec.
Expand Down Expand Up @@ -253,7 +271,10 @@ public Sequence<Row> apply(Interval interval)
query.getContext()
).withOverriddenContext(
ImmutableMap.<String, Object>of(
"finalize", false
"finalize", false,
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user.
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false
)
)
, context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -340,6 +341,7 @@ public int lookupId(String name)
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean sortFacts;
private final Metadata metadata;

private final Map<String, MetricDesc> metricDescs;
Expand Down Expand Up @@ -374,7 +376,8 @@ public InputRow get()
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions
final boolean reportParseExceptions,
final boolean sortFacts
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
Expand All @@ -383,6 +386,7 @@ public IncrementalIndex(
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.reportParseExceptions = reportParseExceptions;
this.sortFacts = sortFacts;

this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));

Expand Down Expand Up @@ -441,7 +445,7 @@ private DimDim newDimDim(String dimension, ValueType type) {
// use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation
protected abstract DimDim makeDimDim(String dimension, Object lock);

public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public abstract ConcurrentMap<TimeAndDims, Integer> getFacts();

public abstract boolean canAppendRow();

Expand Down Expand Up @@ -673,12 +677,20 @@ public int size()

private long getMinTimeMillis()
{
return getFacts().firstKey().getTimestamp();
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).firstKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}

private long getMaxTimeMillis()
{
return getFacts().lastKey().getTimestamp();
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).lastKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}

private int[] getDimVals(final DimDim dimLookup, final List<Comparable> dimValues)
Expand Down Expand Up @@ -831,7 +843,11 @@ public ColumnCapabilities getCapabilities(String column)

public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return getFacts().subMap(start, end);
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).subMap(start, end);
} else {
throw new UnsupportedOperationException("can't get subMap from unsorted facts data.");
}
}

public Metadata getMetadata()
Expand Down Expand Up @@ -862,7 +878,14 @@ public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> pos
public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();

Map<TimeAndDims, Integer> facts = null;
if (descending && sortFacts) {
facts = ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).descendingMap();
} else {
facts = getFacts();
}

return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -51,7 +52,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
private final List<int[]> indexAndOffsets = new ArrayList<>();

private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ConcurrentMap<TimeAndDims, Integer> facts;

private final AtomicInteger indexIncrement = new AtomicInteger(0);

Expand All @@ -71,14 +72,20 @@ public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());

if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}

//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
Expand All @@ -100,6 +107,7 @@ public OffheapIncrementalIndex(
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
Expand All @@ -111,6 +119,7 @@ public OffheapIncrementalIndex(
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount,
bufferPool
);
Expand All @@ -131,13 +140,14 @@ public OffheapIncrementalIndex(
.build(),
true,
true,
true,
maxRowCount,
bufferPool
);
}

@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.ValueType;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -48,7 +46,7 @@
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ConcurrentMap<TimeAndDims, Integer> facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
Expand All @@ -59,12 +57,18 @@ public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
this.maxRowCount = maxRowCount;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());

if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}
}

public OnheapIncrementalIndex(
Expand All @@ -73,6 +77,7 @@ public OnheapIncrementalIndex(
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
Expand All @@ -83,6 +88,7 @@ public OnheapIncrementalIndex(
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount
);
}
Expand All @@ -101,6 +107,7 @@ public OnheapIncrementalIndex(
.build(),
true,
true,
true,
maxRowCount
);
}
Expand All @@ -111,11 +118,11 @@ public OnheapIncrementalIndex(
int maxRowCount
)
{
this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount);
this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount);
}

@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static void setupClass() throws Exception
},
true,
true,
true,
5000
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
Expand Down Expand Up @@ -311,15 +310,15 @@ public void createIndex(
List<File> toMerge = new ArrayList<>();

try {
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
while (rows.hasNext()) {
Object row = rows.next();
if (!index.canAppendRow()) {
File tmp = tempFolder.newFolder();
toMerge.add(tmp);
indexMerger.persist(index, tmp, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void createTestIndex(File segmentDir) throws Exception

IncrementalIndex index = null;
try {
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000);
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, true, 5000);
for (String line : rows) {
index.add(parser.parse(line));
}
Expand Down