Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/querying/scan-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ The following are the main parameters for Scan queries:
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|How many rows buffered before return to client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;

import javax.annotation.Nullable;
import java.util.List;
Expand Down Expand Up @@ -149,6 +150,12 @@ public ScanQuery(
this.columns = columns;
this.legacy = legacy;
this.order = (order == null) ? Order.NONE : order;
if (this.order != Order.NONE) {
Preconditions.checkArgument(
columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME),
"The __time column must be selected if the results are time-ordered."
);
}
this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
}
Expand Down Expand Up @@ -256,6 +263,9 @@ public Boolean isLegacy()
@Override
public Ordering<ScanResultValue> getResultOrdering()
{
if (order == Order.NONE) {
return Ordering.natural();
}
return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.joda.time.Interval;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -111,17 +114,7 @@ public QueryRunner<ScanResultValue> mergeRunners(
return returnedRows;
}
} else {
// Query segment spec must be an instance of MultipleSpecificSegmentSpec because segment descriptors need
// to be present for a 1:1 matching of intervals with query runners. The other types of segment spec condense
// the intervals (i.e. merge neighbouring intervals), eliminating the 1:1 relationship between intervals
// and query runners.
if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec");
}
// Ascending time order for both descriptors and query runners by default
List<SegmentDescriptor> descriptorsOrdered =
((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors();
List<SegmentDescriptor> descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);

if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
Expand Down Expand Up @@ -286,6 +279,28 @@ public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue i
return Sequences.simple(sortedElements);
}

@VisibleForTesting
List<SegmentDescriptor> getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec)
{
// Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because
// segment descriptors need to be present for a 1:1 matching of intervals with query runners.
// The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating
// the 1:1 relationship between intervals and query runners.
List<SegmentDescriptor> descriptorsOrdered;

if (spec instanceof MultipleSpecificSegmentSpec) {
// Ascending time order for both descriptors and query runners by default
descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors();
} else if (spec instanceof SpecificSegmentSpec) {
descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor());
} else {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
spec.getClass().getSimpleName());
}
return descriptorsOrdered;
}

@VisibleForTesting
Sequence<ScanResultValue> nWayMergeAndLimit(
List<List<QueryRunner<ScanResultValue>>> groupedRunners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;

Expand Down Expand Up @@ -78,9 +79,16 @@ public Object getEvents()
public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
{
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
Long timestamp = (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
if (timestamp == null) {
throw new ISE("Unable to compare timestamp for rows without a time column");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This restriction should be more prominent:

  • Check it in ScanQuery's constructor, so the Broker will reject the query if it's not going to work.
  • Add a note to the documentation that you must include the __time column if you are doing time ordering.

}
return timestamp;
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
if (timeColumnIndex == -1) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
return (Long) firstEvent.get(timeColumnIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public ScanResultValueTimestampComparator(ScanQuery scanQuery)
@Override
public int compare(ScanResultValue o1, ScanResultValue o2)
{
int comparison;
comparison = Longs.compare(
int comparison = Longs.compare(
o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
if (scanQuery.getOrder().equals(ScanQuery.Order.DESCENDING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.query.Query;
Expand Down Expand Up @@ -64,14 +63,7 @@ public List<Interval> getIntervals()
intervals = JodaUtils.condenseIntervals(
Iterables.transform(
descriptors,
new Function<SegmentDescriptor, Interval>()
{
@Override
public Interval apply(SegmentDescriptor input)
{
return input.getInterval();
}
}
input -> input.getInterval()
)
);

Expand Down
Loading