Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ public Void call() throws Exception
return new ResourceClosingSequence<T>(
Sequences.simple(
Iterables.transform(
indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()),
indexAccumulatorPair.lhs.iterableWithPostAggregations(
null,
false,
GroupByQueryHelper.isSortResults(query)
),
new Function<Row, T>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public class GroupByQueryHelper
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
public final static String CTX_KEY_SORT_RESULTS = "sortResults";

public static boolean isSortResults(GroupByQuery query)
{
return query.getContextValue(CTX_KEY_SORT_RESULTS, true);
}

public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
final GroupByQuery query,
final GroupByQueryConfig config,
Expand Down Expand Up @@ -82,7 +87,7 @@ public String apply(DimensionSpec input)
);
final IncrementalIndex index;

final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);
final boolean sortResults = isSortResults(query);

if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,13 @@ public Sequence<Row> apply(Interval interval)

} else {
final IncrementalIndex index = makeIncrementalIndex(
query, runner.run(
query.withOverriddenContext(
ImmutableMap.<String, Object>of(
// Don't force sorting here, postAggregate will do it.
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false
)
),
runner.run(
new GroupByQuery(
query.getDataSource(),
query.getQuerySegmentSpec(),
Expand Down Expand Up @@ -287,7 +293,11 @@ public Sequence<Row> apply(Interval interval)
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{
return Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs(), query.isDescending())),
Sequences.simple(index.iterableWithPostAggregations(
query.getPostAggregatorSpecs(),
false,
GroupByQueryHelper.isSortResults(query)
)),
new Function<Row, Row>()
{
@Override
Expand Down Expand Up @@ -428,7 +438,7 @@ public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
return runner.run(query, responseContext);
}
GroupByQuery groupByQuery = (GroupByQuery) query;
if (groupByQuery.getDimFilter() != null){
if (groupByQuery.getDimFilter() != null) {
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
}
final GroupByQuery delegateGroupByQuery = groupByQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ public Void call() throws Exception
return Sequences.simple(bySegmentAccumulatorPair.lhs);
}

return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null, query.isDescending()));
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(
null,
false,
GroupByQueryHelper.isSortResults(queryParam)
));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ public IncrementalIndex(
}
}

private DimDim newDimDim(String dimension, ValueType type) {
private DimDim newDimDim(String dimension, ValueType type)
{
DimDim newDimDim;
switch (type) {
case LONG:
Expand Down Expand Up @@ -561,7 +562,8 @@ public Map<String, DimensionDesc> getDimensionDescs()
*
* @return the number of rows in the data set after adding the InputRow
*/
public int add(InputRow row) throws IndexSizeExceededException {
public int add(InputRow row) throws IndexSizeExceededException
{
TimeAndDims key = toTimeAndDims(row);
final int rv = addToFacts(
metrics,
Expand Down Expand Up @@ -820,7 +822,12 @@ public void loadDimensionIterable(Iterable<String> oldDimensionOrder)
@GuardedBy("dimensionDescs")
private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities)
{
DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim, capabilities.getType()), capabilities);
DimensionDesc desc = new DimensionDesc(
dimensionDescs.size(),
dim,
newDimDim(dim, capabilities.getType()),
capabilities
);
if (dimValues.size() != desc.getIndex()) {
throw new ISE("dimensionDescs and dimValues for [%s] is out of sync!!", dim);
}
Expand Down Expand Up @@ -877,10 +884,14 @@ private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] a
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null, false).iterator();
return iterableWithPostAggregations(null, false, false).iterator();
}

public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs, final boolean descending)
public Iterable<Row> iterableWithPostAggregations(
final List<PostAggregator> postAggs,
final boolean descending,
final boolean sorted
)
{
return new Iterable<Row>()
{
Expand All @@ -889,15 +900,34 @@ public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();

Map<TimeAndDims, Integer> facts = null;
if (descending && sortFacts) {
facts = ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).descendingMap();
final Iterable<Map.Entry<TimeAndDims, Integer>> facts;
if (sorted && sortFacts) {
facts = descending ? ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).descendingMap().entrySet()
: getFacts().entrySet();
} else if (sorted) {
// Materialize and sort
final Comparator<TimeAndDims> comparator = descending
? Ordering.from(dimsComparator()).reverse()
: dimsComparator();
List<Map.Entry<TimeAndDims, Integer>> factsList = Lists.newArrayList(getFacts().entrySet());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may be ok for on-heap index but will unexpectedly put everything on-heap even for any off-heap implementation of IncrementalIndex (for example #2847 )
one option would be to update QueryToolChest to not exercise this path when off-heap merging was requested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that for the current offheap impl this is fine (even the offheap one still has an onheap facts map) but things are different if the facts map is offheap.

I am ok waiting on this PR until #2847 is settled and then figuring out what is best.

Collections.sort(
factsList,
new Comparator<Map.Entry<TimeAndDims, Integer>>()
{
@Override
public int compare(Map.Entry<TimeAndDims, Integer> a, Map.Entry<TimeAndDims, Integer> b)
{
return comparator.compare(a.getKey(), b.getKey());
}
}
);
facts = factsList;
} else {
facts = getFacts();
facts = getFacts().entrySet();
}

return Iterators.transform(
facts.entrySet().iterator(),
facts.iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
{
@Override
Expand Down