diff --git a/distribution/pom.xml b/distribution/pom.xml index 4dca3b03f514..8845d18a31c5 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -232,8 +232,6 @@ -c io.druid.extensions.contrib:druid-redis-cache -c - io.druid.extensions.contrib:scan-query - -c io.druid.extensions.contrib:sqlserver-metadata-storage -c io.druid.extensions.contrib:statsd-emitter diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 3f67839c62bc..bc7283060425 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -70,7 +70,6 @@ All of these community extensions can be downloaded using *pull-deps* with the c |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| |kafka-emitter|Kafka metrics emitter|[link](../development/extensions-contrib/kafka-emitter.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| -|scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)| ## Promoting Community Extension to Core Extension diff --git a/docs/content/development/extensions-contrib/scan-query.md b/docs/content/querying/scan-query.md similarity index 82% rename from docs/content/development/extensions-contrib/scan-query.md rename to docs/content/querying/scan-query.md index 3eef162b254e..7869d564d248 100644 --- a/docs/content/development/extensions-contrib/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -31,8 +31,11 @@ There are several main parts to a scan query: |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |batchSize|How many rows buffered before return to client. Default is `20480`|no| |limit|How many rows to return. If not specified, all rows will be returned.|no| +|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no| |context|An additional JSON Object which can be used to specify certain flags.|no| +## Example results + The format of the result when resultFormat equals to `list`: ```json @@ -154,4 +157,19 @@ The format of the result when resultFormat equals to `compactedList`: The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client. It will cause memory pressure if too many rows required by select query. Scan query doesn't have this issue. -Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly. \ No newline at end of file +Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly. + +## Legacy mode + +The Scan query supports a legacy mode designed for protocol compatibility with the former scan-query contrib extension. +In legacy mode you can expect the following behavior changes: + +- The __time column is returned as "timestamp" rather than "__time". This will take precedence over any other column +you may have that is named "timestamp". +- The __time column is included in the list of columns even if you do not specifically ask for it. +- Timestamps are returned as ISO8601 time strings rather than integers (milliseconds since 1970-01-01 00:00:00 UTC). + +Legacy mode can be triggered either by passing `"legacy" : true` in your query JSON, or by setting +`druid.query.scan.legacy = true` on your Druid nodes. If you were previously using the scan-query contrib extension, +the best way to migrate is to activate legacy mode during a rolling upgrade, then switch it off after the upgrade +is complete. diff --git a/docs/content/querying/select-query.md b/docs/content/querying/select-query.md index 41960d30a97e..f3d302bbf2ca 100644 --- a/docs/content/querying/select-query.md +++ b/docs/content/querying/select-query.md @@ -2,6 +2,7 @@ layout: doc_page --- # Select Queries + Select queries return raw Druid rows and support pagination. ```json @@ -19,6 +20,13 @@ Select queries return raw Druid rows and support pagination. } ``` +
+Consider using the [Scan query](scan-query.html) instead of the Select query if you don't need pagination, and you +don't need the strict time-ascending or time-descending ordering offered by the Select query. The Scan query returns +results without pagination, and offers "looser" ordering than Select, but is significantly more efficient in terms of +both processing time and memory requirements. It is also capable of returning a virtually unlimited number of results. +
+ There are several main parts to a select query: |property|description|required?| diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index e2b0e3e21a9a..911e2c86c16d 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -256,7 +256,9 @@ converted to zeroes). ## Query execution -Queries without aggregations will use Druid's [Select](select-query.html) native query type. +Queries without aggregations will use Druid's [Scan](scan-query.html) or [Select](select-query.html) native query types. +Scan is used whenever possible, as it is generally higher performance and more efficient than Select. However, Select +is used in one case: when the query includes an `ORDER BY __time`, since Scan does not have a sorting feature. Aggregation queries (using GROUP BY, DISTINCT, or any aggregation functions) will use one of Druid's three native aggregation query types. Two (Timeseries and TopN) are specialized for specific types of aggregations, whereas the other diff --git a/docs/content/toc.md b/docs/content/toc.md index 6eca667d36c9..d18de0e7c734 100644 --- a/docs/content/toc.md +++ b/docs/content/toc.md @@ -34,6 +34,7 @@ layout: toc * [DataSource Metadata](/docs/VERSION/querying/datasourcemetadataquery.html) * [Search](/docs/VERSION/querying/searchquery.html) * [Select](/docs/VERSION/querying/select-query.html) + * [Scan](/docs/VERSION/querying/scan-query.html) * Components * [Datasources](/docs/VERSION/querying/datasource.html) * [Filters](/docs/VERSION/querying/filters.html) diff --git a/extensions-contrib/scan-query/pom.xml b/extensions-contrib/scan-query/pom.xml deleted file mode 100644 index 328be800e5d7..000000000000 --- a/extensions-contrib/scan-query/pom.xml +++ /dev/null @@ -1,63 +0,0 @@ - - - - - - - io.druid - druid - 0.11.0-SNAPSHOT - ../../pom.xml - - 4.0.0 - - io.druid.extensions.contrib - scan-query - scan-query - streaming version of select query - - - - io.druid - druid-server - ${project.parent.version} - - - junit - junit - test - - - org.easymock - easymock - test - - - io.druid - druid-processing - ${project.parent.version} - tests - test - - - - diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java deleted file mode 100644 index e8696d1a0a86..000000000000 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package io.druid.query.scan; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.inject.Binder; -import io.druid.guice.DruidBinders; -import io.druid.guice.LazySingleton; -import io.druid.initialization.DruidModule; - -import java.util.Arrays; -import java.util.List; - -public class ScanQueryDruidModule implements DruidModule -{ - @Override - public void configure(Binder binder) - { - DruidBinders.queryToolChestBinder(binder) - .addBinding(ScanQuery.class) - .to(ScanQueryQueryToolChest.class) - .in(LazySingleton.class); - - DruidBinders.queryRunnerFactoryBinder(binder) - .addBinding(ScanQuery.class) - .to(ScanQueryRunnerFactory.class) - .in(LazySingleton.class); - } - - @Override - public List getJacksonModules() - { - return Arrays.asList( - new SimpleModule("ScanQueryDruidModule") - .registerSubtypes( - new NamedType(ScanQuery.class, ScanQuery.SCAN) - ) - ); - } -} diff --git a/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule deleted file mode 100644 index 1459501bf4a8..000000000000 --- a/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ /dev/null @@ -1 +0,0 @@ -io.druid.query.scan.ScanQueryDruidModule \ No newline at end of file diff --git a/pom.xml b/pom.xml index 57d3146ab9b5..d6ebf89433ff 100644 --- a/pom.xml +++ b/pom.xml @@ -134,7 +134,6 @@ extensions-contrib/virtual-columns extensions-contrib/thrift-extensions extensions-contrib/ambari-metrics-emitter - extensions-contrib/scan-query extensions-contrib/sqlserver-metadata-storage extensions-contrib/kafka-emitter extensions-contrib/redis-cache diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9e0d8a0f658d..7e537c2a4ad0 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -27,6 +27,7 @@ import io.druid.query.filter.DimFilter; import io.druid.query.groupby.GroupByQuery; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.scan.ScanQuery; import io.druid.query.search.search.SearchQuery; import io.druid.query.select.SelectQuery; import io.druid.query.spec.QuerySegmentSpec; @@ -46,6 +47,7 @@ @JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class), @JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class), @JsonSubTypes.Type(name = Query.GROUP_BY, value = GroupByQuery.class), + @JsonSubTypes.Type(name = Query.SCAN, value = ScanQuery.class), @JsonSubTypes.Type(name = Query.SEGMENT_METADATA, value = SegmentMetadataQuery.class), @JsonSubTypes.Type(name = Query.SELECT, value = SelectQuery.class), @JsonSubTypes.Type(name = Query.TOPN, value = TopNQuery.class), @@ -58,6 +60,7 @@ public interface Query String SEARCH = "search"; String TIME_BOUNDARY = "timeBoundary"; String GROUP_BY = "groupBy"; + String SCAN = "scan"; String SEGMENT_METADATA = "segmentMetadata"; String SELECT = "select"; String TOPN = "topN"; diff --git a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java index b3e7cb39098f..df6c86c4867f 100644 --- a/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java +++ b/processing/src/main/java/io/druid/query/extraction/ExtractionFn.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.guice.annotations.ExtensionPoint; +import io.druid.java.util.common.Cacheable; import io.druid.query.lookup.LookupExtractionFn; import io.druid.query.lookup.RegisteredLookupExtractionFn; @@ -57,16 +58,8 @@ * regular expression with a capture group. When the regular expression matches the value of a dimension, * the value captured by the group is used for grouping operations instead of the dimension value. */ -public interface ExtractionFn +public interface ExtractionFn extends Cacheable { - /** - * Returns a byte[] unique to all concrete implementations of DimExtractionFn. This byte[] is used to - * generate a cache key for the specific query. - * - * @return a byte[] unit to all concrete implements of DimExtractionFn - */ - public byte[] getCacheKey(); - /** * The "extraction" function. This should map an Object into some String value. *

diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/processing/src/main/java/io/druid/query/scan/ScanQuery.java similarity index 75% rename from extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java rename to processing/src/main/java/io/druid/query/scan/ScanQuery.java index 1606ded6156e..e635191153e2 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/io/druid/query/scan/ScanQuery.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.query.BaseQuery; @@ -32,39 +31,45 @@ import io.druid.query.filter.SelectorDimFilter; import io.druid.query.spec.LegacySegmentSpec; import io.druid.query.spec.QuerySegmentSpec; +import io.druid.segment.VirtualColumn; +import io.druid.segment.VirtualColumns; import org.joda.time.Interval; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; -@JsonTypeName("scan") public class ScanQuery extends BaseQuery { - public static final String SCAN = "scan"; public static final String RESULT_FORMAT_LIST = "list"; public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList"; public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector"; + private final VirtualColumns virtualColumns; private final String resultFormat; private final int batchSize; private final long limit; private final DimFilter dimFilter; private final List columns; + private final Boolean legacy; @JsonCreator public ScanQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("virtualColumns") VirtualColumns virtualColumns, @JsonProperty("resultFormat") String resultFormat, @JsonProperty("batchSize") int batchSize, @JsonProperty("limit") long limit, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("columns") List columns, + @JsonProperty("legacy") Boolean legacy, @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, false, context); + this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat; this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; this.limit = (limit == 0) ? Long.MAX_VALUE : limit; @@ -72,6 +77,13 @@ public ScanQuery( Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0"); this.dimFilter = dimFilter; this.columns = columns; + this.legacy = legacy; + } + + @JsonProperty + public VirtualColumns getVirtualColumns() + { + return virtualColumns; } @JsonProperty @@ -99,6 +111,7 @@ public boolean hasFilters() } @Override + @JsonProperty public DimFilter getFilter() { return dimFilter; @@ -110,16 +123,24 @@ public String getType() return SCAN; } - @JsonProperty("filter") - public DimFilter getDimensionsFilter() + @JsonProperty + public List getColumns() { - return dimFilter; + return columns; } + /** + * Compatibility mode with the legacy scan-query extension. + */ @JsonProperty - public List getColumns() + public Boolean isLegacy() { - return columns; + return legacy; + } + + public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) + { + return ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build(); } @Override @@ -146,7 +167,7 @@ public ScanQuery withDimFilter(DimFilter dimFilter) } @Override - public boolean equals(Object o) + public boolean equals(final Object o) { if (this == o) { return true; @@ -157,49 +178,36 @@ public boolean equals(Object o) if (!super.equals(o)) { return false; } - - ScanQuery that = (ScanQuery) o; - - if (batchSize != that.batchSize) { - return false; - } - if (limit != that.limit) { - return false; - } - if (resultFormat != null ? !resultFormat.equals(that.resultFormat) : that.resultFormat != null) { - return false; - } - if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) { - return false; - } - return columns != null ? columns.equals(that.columns) : that.columns == null; + final ScanQuery scanQuery = (ScanQuery) o; + return batchSize == scanQuery.batchSize && + limit == scanQuery.limit && + legacy == scanQuery.legacy && + Objects.equals(virtualColumns, scanQuery.virtualColumns) && + Objects.equals(resultFormat, scanQuery.resultFormat) && + Objects.equals(dimFilter, scanQuery.dimFilter) && + Objects.equals(columns, scanQuery.columns); } @Override public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0); - result = 31 * result + batchSize; - result = 31 * result + (int) (limit ^ (limit >>> 32)); - result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); - result = 31 * result + (columns != null ? columns.hashCode() : 0); - return result; + return Objects.hash(super.hashCode(), virtualColumns, resultFormat, batchSize, limit, dimFilter, columns, legacy); } @Override public String toString() { return "ScanQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", descending=" + isDescending() + - ", resultFormat='" + resultFormat + '\'' + - ", batchSize=" + batchSize + - ", limit=" + limit + - ", dimFilter=" + dimFilter + - ", columns=" + columns + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", virtualColumns=" + getVirtualColumns() + + ", resultFormat='" + resultFormat + '\'' + + ", batchSize=" + batchSize + + ", limit=" + limit + + ", dimFilter=" + dimFilter + + ", columns=" + columns + + ", legacy=" + legacy + + '}'; } /** @@ -221,23 +229,27 @@ public static class ScanQueryBuilder { private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; + private VirtualColumns virtualColumns; private Map context; private String resultFormat; private int batchSize; private long limit; private DimFilter dimFilter; private List columns; + private Boolean legacy; public ScanQueryBuilder() { dataSource = null; querySegmentSpec = null; + virtualColumns = null; context = null; resultFormat = null; batchSize = 0; limit = 0; dimFilter = null; columns = Lists.newArrayList(); + legacy = null; } public ScanQuery build() @@ -245,11 +257,13 @@ public ScanQuery build() return new ScanQuery( dataSource, querySegmentSpec, + virtualColumns, resultFormat, batchSize, limit, dimFilter, columns, + legacy, context ); } @@ -259,11 +273,13 @@ public static ScanQueryBuilder copy(ScanQuery query) return new ScanQueryBuilder() .dataSource(query.getDataSource()) .intervals(query.getQuerySegmentSpec()) + .virtualColumns(query.getVirtualColumns()) .resultFormat(query.getResultFormat()) .batchSize(query.getBatchSize()) .limit(query.getLimit()) .filters(query.getFilter()) .columns(query.getColumns()) + .legacy(query.isLegacy()) .context(query.getContext()); } @@ -297,6 +313,22 @@ public ScanQueryBuilder intervals(List l) return this; } + public ScanQueryBuilder virtualColumns(VirtualColumns virtualColumns) + { + this.virtualColumns = virtualColumns; + return this; + } + + public ScanQueryBuilder virtualColumns(List virtualColumns) + { + return virtualColumns(VirtualColumns.create(virtualColumns)); + } + + public ScanQueryBuilder virtualColumns(VirtualColumn... virtualColumns) + { + return virtualColumns(VirtualColumns.create(Arrays.asList(virtualColumns))); + } + public ScanQueryBuilder context(Map c) { context = c; @@ -350,6 +382,12 @@ public ScanQueryBuilder columns(String... c) columns = Arrays.asList(c); return this; } + + public ScanQueryBuilder legacy(Boolean legacy) + { + this.legacy = legacy; + return this; + } } public static ScanQueryBuilder newScanQueryBuilder() diff --git a/processing/src/main/java/io/druid/query/scan/ScanQueryConfig.java b/processing/src/main/java/io/druid/query/scan/ScanQueryConfig.java new file mode 100644 index 000000000000..da1e9b4d0c1b --- /dev/null +++ b/processing/src/main/java/io/druid/query/scan/ScanQueryConfig.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class ScanQueryConfig +{ + @JsonProperty + private boolean legacy = false; + + public boolean isLegacy() + { + return legacy; + } + + public ScanQueryConfig setLegacy(final boolean legacy) + { + this.legacy = legacy; + return this; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ScanQueryConfig that = (ScanQueryConfig) o; + return legacy == that.legacy; + } + + @Override + public int hashCode() + { + return Objects.hash(legacy); + } + + @Override + public String toString() + { + return "ScanQueryConfig{" + + "legacy=" + legacy + + '}'; + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/io/druid/query/scan/ScanQueryEngine.java similarity index 59% rename from extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java rename to processing/src/main/java/io/druid/query/scan/ScanQueryEngine.java index ebaf70906d39..b8e39fb5f8e6 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -20,46 +20,51 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.UOE; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.ColumnSelectorPlus; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; -import io.druid.query.dimension.DefaultDimensionSpec; -import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.Filter; -import io.druid.query.select.SelectQueryEngine; import io.druid.segment.Cursor; -import io.druid.segment.DimensionHandlerUtils; -import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; -import io.druid.segment.VirtualColumns; +import io.druid.segment.VirtualColumn; import io.druid.segment.column.Column; import io.druid.segment.filter.Filters; import org.joda.time.Interval; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; public class ScanQueryEngine { - private static final SelectQueryEngine.SelectStrategyFactory STRATEGY_FACTORY = new SelectQueryEngine.SelectStrategyFactory(); + private static final String LEGACY_TIMESTAMP_KEY = "timestamp"; + public Sequence process( final ScanQuery query, final Segment segment, final Map responseContext ) { + // "legacy" should be non-null due to toolChest.mergeResults + final boolean legacy = Preconditions.checkNotNull(query.isLegacy(), "WTF?! Expected non-null legacy"); + if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) { long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); if (count >= query.getLimit()) { @@ -77,32 +82,42 @@ public Sequence process( ); } - List allDims = Lists.newLinkedList(adapter.getAvailableDimensions()); - List allMetrics = Lists.newLinkedList(adapter.getAvailableMetrics()); - final List allColumns = Lists.newLinkedList(); + final List allColumns = new ArrayList<>(); + if (query.getColumns() != null && !query.getColumns().isEmpty()) { - if (!query.getColumns().contains(ScanResultValue.timestampKey)) { - allColumns.add(ScanResultValue.timestampKey); + if (legacy && !query.getColumns().contains(LEGACY_TIMESTAMP_KEY)) { + allColumns.add(LEGACY_TIMESTAMP_KEY); } + + // Unless we're in legacy mode, allColumns equals query.getColumns() exactly. This is nice since it makes + // the compactedList form easier to use. allColumns.addAll(query.getColumns()); - allDims.retainAll(query.getColumns()); - allMetrics.retainAll(query.getColumns()); } else { - if (!allDims.contains(ScanResultValue.timestampKey)) { - allColumns.add(ScanResultValue.timestampKey); + final Set availableColumns = Sets.newLinkedHashSet( + Iterables.concat( + Collections.singleton(legacy ? LEGACY_TIMESTAMP_KEY : Column.TIME_COLUMN_NAME), + Iterables.transform( + Arrays.asList(query.getVirtualColumns().getVirtualColumns()), + VirtualColumn::getOutputName + ), + adapter.getAvailableDimensions(), + adapter.getAvailableMetrics() + ) + ); + + allColumns.addAll(availableColumns); + + if (legacy) { + allColumns.remove(Column.TIME_COLUMN_NAME); } - allColumns.addAll(allDims); - allColumns.addAll(allMetrics); } - final List dims = DefaultDimensionSpec.toSpec(allDims); - final List metrics = allMetrics; final List intervals = query.getQuerySegmentSpec().getIntervals(); Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals); final String segmentId = segment.getIdentifier(); - final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); + final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getFilter())); if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) { responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L); @@ -113,7 +128,7 @@ public Sequence process( adapter.makeCursors( filter, intervals.get(0), - VirtualColumns.EMPTY, + query.getVirtualColumns(), Granularities.ALL, query.isDescending(), null @@ -129,23 +144,21 @@ public Sequence apply(final Cursor cursor) @Override public Iterator make() { - final LongColumnSelector timestampColumnSelector = - cursor.getColumnSelectorFactory().makeLongColumnSelector(Column.TIME_COLUMN_NAME); - - final List> selectorPlusList = Arrays.asList( - DimensionHandlerUtils.createColumnSelectorPluses( - STRATEGY_FACTORY, - Lists.newArrayList(dims), - cursor.getColumnSelectorFactory() - ) - ); - - final Map metSelectors = Maps.newHashMap(); - for (String metric : metrics) { - final ObjectColumnSelector metricSelector = - cursor.getColumnSelectorFactory().makeObjectColumnSelector(metric); - metSelectors.put(metric, metricSelector); + final List columnSelectors = new ArrayList<>(allColumns.size()); + + for (String column : allColumns) { + final ObjectColumnSelector selector; + + if (legacy && column.equals(LEGACY_TIMESTAMP_KEY)) { + selector = cursor.getColumnSelectorFactory() + .makeObjectColumnSelector(Column.TIME_COLUMN_NAME); + } else { + selector = cursor.getColumnSelectorFactory().makeObjectColumnSelector(column); + } + + columnSelectors.add(selector); } + final int batchSize = query.getBatchSize(); return new Iterator() { @@ -163,15 +176,15 @@ public ScanResultValue next() if (hasTimeout && System.currentTimeMillis() >= timeoutAt) { throw new QueryInterruptedException(new TimeoutException()); } - long lastOffset = offset; - Object events = null; - String resultFormat = query.getResultFormat(); - if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) { - throw new UnsupportedOperationException("valueVector is not supported now"); - } else if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) { + final long lastOffset = offset; + final Object events; + final String resultFormat = query.getResultFormat(); + if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) { events = rowsToCompactedList(); - } else { + } else if (ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) { events = rowsToList(); + } else { + throw new UOE("resultFormat[%s] is not supported", resultFormat); } responseContext.put( ScanQueryRunnerFactory.CTX_COUNT, @@ -192,46 +205,48 @@ public void remove() throw new UnsupportedOperationException(); } - private Object rowsToCompactedList() + private List rowsToCompactedList() { - return Lists.transform( - (List>) rowsToList(), - new Function, Object>() - { - @Override - public Object apply(Map input) - { - List eventValues = Lists.newArrayListWithExpectedSize(allColumns.size()); - for (String expectedColumn : allColumns) { - eventValues.add(input.get(expectedColumn)); - } - return eventValues; - } - } - ); + final List events = new ArrayList<>(batchSize); + for (int i = 0; !cursor.isDone() + && i < batchSize + && offset < limit; cursor.advance(), i++, offset++) { + final List theEvent = new ArrayList<>(allColumns.size()); + for (int j = 0; j < allColumns.size(); j++) { + theEvent.add(getColumnValue(j)); + } + events.add(theEvent); + } + return events; } - private Object rowsToList() + private List> rowsToList() { List> events = Lists.newArrayListWithCapacity(batchSize); for (int i = 0; !cursor.isDone() - && i < batchSize - && offset < limit; cursor.advance(), i++, offset++) { - final Map theEvent = SelectQueryEngine.singleEvent( - ScanResultValue.timestampKey, - timestampColumnSelector, - selectorPlusList, - metSelectors - ); + && i < batchSize + && offset < limit; cursor.advance(), i++, offset++) { + final Map theEvent = new LinkedHashMap<>(); + for (int j = 0; j < allColumns.size(); j++) { + theEvent.put(allColumns.get(j), getColumnValue(j)); + } events.add(theEvent); } return events; } - private Object rowsToValueVector() + private Object getColumnValue(int i) { - // only support list now, we can support ValueVector or Arrow in future - return rowsToList(); + final ObjectColumnSelector selector = columnSelectors.get(i); + final Object value; + + if (legacy && allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) { + value = DateTimes.utc((long) selector.get()); + } else { + value = selector == null ? null : selector.get(); + } + + return value; } }; } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/processing/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java similarity index 100% rename from extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java rename to processing/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java similarity index 80% rename from extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java rename to processing/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 26edb06c9fd4..b5ba83b8bb45 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package io.druid.query.scan; import com.fasterxml.jackson.core.type.TypeReference; @@ -41,11 +42,16 @@ public class ScanQueryQueryToolChest extends QueryToolChest run( final QueryPlus queryPlus, final Map responseContext ) { - ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); + // Ensure "legacy" is a non-null value, such that all other nodes this query is forwarded to will treat it + // the same way, even if they have different default legacy values. + final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig); + final QueryPlus queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); + if (scanQuery.getLimit() == Long.MAX_VALUE) { - return runner.run(queryPlus, responseContext); + return runner.run(queryPlusWithNonNullLegacy, responseContext); } return new BaseSequence<>( new BaseSequence.IteratorMaker() @@ -69,7 +79,7 @@ public Sequence run( @Override public ScanQueryLimitRowIterator make() { - return new ScanQueryLimitRowIterator(runner, queryPlus, responseContext); + return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); } @Override @@ -114,8 +124,8 @@ public Sequence run( ) { ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery(); - if (scanQuery.getDimensionsFilter() != null) { - scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize()); + if (scanQuery.getFilter() != null) { + scanQuery = scanQuery.withDimFilter(scanQuery.getFilter().optimize()); queryPlus = queryPlus.withQuery(scanQuery); } return runner.run(queryPlus, responseContext); diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java similarity index 100% rename from extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java rename to processing/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/processing/src/main/java/io/druid/query/scan/ScanResultValue.java similarity index 98% rename from extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java rename to processing/src/main/java/io/druid/query/scan/ScanResultValue.java index 76d8bf5dba97..c81c9c83dec0 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java +++ b/processing/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -25,8 +25,6 @@ public class ScanResultValue implements Comparable { - public static final String timestampKey = "timestamp"; - private final String segmentId; private final List columns; private final Object events; diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/processing/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java similarity index 99% rename from extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java rename to processing/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java index f4f121d2ea19..71db550a6d93 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/processing/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -64,6 +64,7 @@ public class MultiSegmentScanQueryTest { private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest( + new ScanQueryConfig(), DefaultGenericQueryMetricsFactory.instance() ); @@ -184,6 +185,7 @@ private ScanQuery.ScanQueryBuilder newBuilder() .intervals(SelectQueryRunnerTest.I_0112_0114) .batchSize(batchSize) .columns(Arrays.asList()) + .legacy(false) .limit(limit); } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java similarity index 76% rename from extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java rename to processing/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java index ca625ecb36ba..84d5a3e5fb01 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java @@ -20,20 +20,25 @@ package io.druid.query.scan; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.ObjectArrays; import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; +import io.druid.hll.HyperLogLogCollector; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.QueryPlus; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.TableDataSource; +import io.druid.query.expression.TestExprMacroTable; import io.druid.query.extraction.MapLookupExtractor; import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.DimFilter; @@ -41,6 +46,11 @@ import io.druid.query.lookup.LookupExtractionFn; import io.druid.query.spec.LegacySegmentSpec; import io.druid.query.spec.QuerySegmentSpec; +import io.druid.segment.VirtualColumn; +import io.druid.segment.column.Column; +import io.druid.segment.column.ValueType; +import io.druid.segment.virtual.ExpressionVirtualColumn; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -61,6 +71,9 @@ @RunWith(Parameterized.class) public class ScanQueryRunnerTest { + private static final VirtualColumn EXPR_COLUMN = + new ExpressionVirtualColumn("expr", "index * 2", ValueType.LONG, TestExprMacroTable.INSTANCE); + // copied from druid.sample.numeric.tsv public static final String[] V_0112 = { "2011-01-12T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t10000.0\t100000\tpreferred\tapreferred\t100.000000", @@ -99,10 +112,11 @@ public class ScanQueryRunnerTest public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class); private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest( + new ScanQueryConfig(), DefaultGenericQueryMetricsFactory.instance() ); - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0}, legacy = {1}") public static Iterable constructorFeeder() throws IOException { return QueryRunnerTestHelper.cartesian( @@ -111,15 +125,18 @@ public static Iterable constructorFeeder() throws IOException toolChest, new ScanQueryEngine() ) - ) + ), + ImmutableList.of(false, true) ); } private final QueryRunner runner; + private final boolean legacy; - public ScanQueryRunnerTest(QueryRunner runner) + public ScanQueryRunnerTest(final QueryRunner runner, final boolean legacy) { this.runner = runner; + this.legacy = legacy; } private ScanQuery.ScanQueryBuilder newTestQuery() @@ -128,14 +145,16 @@ private ScanQuery.ScanQueryBuilder newTestQuery() .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) .columns(Arrays.asList()) .intervals(QueryRunnerTestHelper.fullOnInterval) - .limit(3); + .limit(3) + .legacy(legacy); } @Test public void testFullOnSelect() { List columns = Lists.newArrayList( - ScanResultValue.timestampKey, + getTimestampName(), + "expr", "market", "quality", "qualityLong", @@ -156,6 +175,7 @@ public void testFullOnSelect() ); ScanQuery query = newTestQuery() .intervals(I_0112_0114) + .virtualColumns(EXPR_COLUMN) .build(); HashMap context = new HashMap(); @@ -177,7 +197,8 @@ public void testFullOnSelect() public void testFullOnSelectAsCompactedList() { final List columns = Lists.newArrayList( - ScanResultValue.timestampKey, + getTimestampName(), + "expr", "market", "quality", "qualityLong", @@ -198,6 +219,7 @@ public void testFullOnSelectAsCompactedList() ); ScanQuery query = newTestQuery() .intervals(I_0112_0114) + .virtualColumns(EXPR_COLUMN) .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) .build(); @@ -216,6 +238,56 @@ public void testFullOnSelectAsCompactedList() verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(compactedListToRow(results), "null_column")); } + @Test + public void testSelectWithUnderscoreUnderscoreTime() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns(Column.TIME_COLUMN_NAME, QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(QueryPlus.wrap(query), context), + Lists.newArrayList() + ); + + final List>> expectedEvents = toEvents( + new String[]{ + getTimestampName() + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + null, + null, + null, + null, + null, + null, + null, + QueryRunnerTestHelper.indexMetric + ":DOUBLE" + }, + V_0112_0114 + ); + + // Add "__time" to all the expected events in legacy mode + if (legacy) { + for (List> batch : expectedEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } + + List expectedResults = toExpected( + expectedEvents, + legacy + ? Lists.newArrayList(getTimestampName(), "__time", "market", "index") + : Lists.newArrayList("__time", "market", "index"), + 0, + 3 + ); + verify(expectedResults, results); + } + @Test public void testSelectWithDimsAndMets() { @@ -233,7 +305,7 @@ public void testSelectWithDimsAndMets() List expectedResults = toExpected( toEvents( new String[]{ - ScanResultValue.timestampKey + ":TIME", + legacy ? getTimestampName() + ":TIME" : null, QueryRunnerTestHelper.marketDimension + ":STRING", null, null, @@ -246,7 +318,7 @@ public void testSelectWithDimsAndMets() }, V_0112_0114 ), - Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"), + legacy ? Lists.newArrayList(getTimestampName(), "market", "index") : Lists.newArrayList("market", "index"), 0, 3 ); @@ -271,7 +343,7 @@ public void testSelectWithDimsAndMetsAsCompactedList() List expectedResults = toExpected( toEvents( new String[]{ - ScanResultValue.timestampKey + ":TIME", + legacy ? getTimestampName() + ":TIME" : null, QueryRunnerTestHelper.marketDimension + ":STRING", null, null, @@ -284,7 +356,7 @@ public void testSelectWithDimsAndMetsAsCompactedList() }, V_0112_0114 ), - Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"), + legacy ? Lists.newArrayList(getTimestampName(), "market", "index") : Lists.newArrayList("market", "index"), 0, 3 ); @@ -311,7 +383,7 @@ public void testFullOnSelectWithFilterAndLimit() final List>> events = toEvents( new String[]{ - ScanResultValue.timestampKey + ":TIME", + legacy ? getTimestampName() + ":TIME" : null, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -345,7 +417,7 @@ public void testFullOnSelectWithFilterAndLimit() List expectedResults = toExpected( events, - Lists.newArrayList(ScanResultValue.timestampKey, "quality", "index"), + legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), 0, limit ); @@ -378,7 +450,7 @@ public void testSelectWithFilterLookupExtractionFn() final List>> events = toEvents( new String[]{ - ScanResultValue.timestampKey + ":TIME", + legacy ? getTimestampName() + ":TIME" : null, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -398,7 +470,14 @@ public void testSelectWithFilterLookupExtractionFn() List expectedResults = toExpected( events, - Lists.newArrayList(ScanResultValue.timestampKey, QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric), + legacy ? Lists.newArrayList( + getTimestampName(), + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) : Lists.newArrayList( + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ), 0, 3 ); @@ -446,18 +525,17 @@ public void testFullSelectNoDimensionAndMetric() ); final List>> events = toEvents( - new String[]{ - ScanResultValue.timestampKey + ":TIME" - }, + legacy ? new String[]{getTimestampName() + ":TIME"} : new String[0], V_0112_0114 ); List expectedResults = toExpected( events, - Lists.newArrayList(ScanResultValue.timestampKey, "foo", "foo2"), + legacy ? Lists.newArrayList(getTimestampName(), "foo", "foo2") : Lists.newArrayList("foo", "foo2"), 0, 3 ); + verify(expectedResults, results); } @@ -465,7 +543,7 @@ private List>> toFullEvents(final String[]... valueSet) { return toEvents( new String[]{ - ScanResultValue.timestampKey + ":TIME", + getTimestampName() + ":TIME", QueryRunnerTestHelper.marketDimension + ":STRING", QueryRunnerTestHelper.qualityDimension + ":STRING", "qualityLong" + ":LONG", @@ -475,7 +553,14 @@ private List>> toFullEvents(final String[]... valueSet) QueryRunnerTestHelper.placementDimension + ":STRING", QueryRunnerTestHelper.placementishDimension + ":STRINGS", QueryRunnerTestHelper.indexMetric + ":DOUBLE", - QueryRunnerTestHelper.partialNullDimension + ":STRING" + QueryRunnerTestHelper.partialNullDimension + ":STRING", + "expr", + "indexMin", + "indexFloat", + "indexMaxPlusTen", + "indexMinFloat", + "indexMaxFloat", + "quality_uniques" }, valueSet ); @@ -499,14 +584,53 @@ public Map apply(String input) Map event = Maps.newHashMap(); String[] values = input.split("\\t"); for (int i = 0; i < dimSpecs.length; i++) { - if (dimSpecs[i] == null || i >= dimSpecs.length || i >= values.length) { + if (dimSpecs[i] == null || i >= dimSpecs.length) { + continue; + } + + // For testing metrics and virtual columns we have some special handling here, since + // they don't appear in the source data. + if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) { + event.put( + EXPR_COLUMN.getOutputName(), + (double) event.get(QueryRunnerTestHelper.indexMetric) * 2 + ); + continue; + } else if (dimSpecs[i].equals("indexMin")) { + event.put("indexMin", (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexFloat")) { + event.put("indexFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexMaxPlusTen")) { + event.put("indexMaxPlusTen", (double) event.get(QueryRunnerTestHelper.indexMetric) + 10); continue; + } else if (dimSpecs[i].equals("indexMinFloat")) { + event.put("indexMinFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexMaxFloat")) { + event.put("indexMaxFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("quality_uniques")) { + final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + collector.add( + Hashing.murmur3_128() + .hashBytes(StringUtils.toUtf8((String) event.get("quality"))) + .asBytes() + ); + event.put("quality_uniques", collector); } + + if (i >= values.length) { + continue; + } + String[] specs = dimSpecs[i].split(":"); + event.put( specs[0], specs.length == 1 || specs[1].equals("STRING") ? values[i] : - specs[1].equals("TIME") ? DateTimes.of(values[i]) : + specs[1].equals("TIME") ? toTimestamp(values[i]) : specs[1].equals("FLOAT") ? Float.valueOf(values[i]) : specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) : specs[1].equals("LONG") ? Long.valueOf(values[i]) : @@ -524,6 +648,20 @@ public Map apply(String input) return events; } + private Object toTimestamp(final String value) + { + if (legacy) { + return DateTimes.of(value); + } else { + return DateTimes.of(value).getMillis(); + } + } + + private String getTimestampName() + { + return legacy ? "timestamp" : Column.TIME_COLUMN_NAME; + } + private List toExpected( List>> targets, List columns, @@ -578,9 +716,23 @@ private static void verify( for (Map.Entry ex : exHolder.entrySet()) { Object actVal = acHolder.get(ex.getKey()); - // work around for current II limitations + if (actVal instanceof String[]) { + actVal = Arrays.asList((String[]) actVal); + } + Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal); } + + for (Map.Entry ac : acHolder.entrySet()) { + Object exVal = exHolder.get(ac.getKey()); + Object actVal = ac.getValue(); + + if (actVal instanceof String[]) { + actVal = Arrays.asList((String[]) actVal); + } + + Assert.assertEquals("invalid value for " + ac.getKey(), exVal, actVal); + } } if (actualEvts.hasNext()) { @@ -612,7 +764,7 @@ private static Iterable populateNullColumnAtLastForQueryableInd private Iterable compactedListToRow(Iterable results) { - return Iterables.transform(results, new Function() + return Lists.newArrayList(Iterables.transform(results, new Function() { @Override public ScanResultValue apply(ScanResultValue input) @@ -629,6 +781,6 @@ public ScanResultValue apply(ScanResultValue input) } return new ScanResultValue(input.getSegmentId(), input.getColumns(), mapEvents); } - }); + })); } } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java b/processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java similarity index 95% rename from extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java rename to processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java index 8567b62f56fa..774a3b798a6e 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java +++ b/processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java @@ -25,6 +25,7 @@ import io.druid.query.QueryRunnerTestHelper; import io.druid.query.TableDataSource; import io.druid.query.spec.LegacySegmentSpec; +import io.druid.segment.VirtualColumns; import org.junit.Assert; import org.junit.Test; @@ -48,22 +49,26 @@ public void testSerializationLegacyString() throws Exception String current = "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]}," + + "\"virtualColumns\":[]," + "\"resultFormat\":\"list\"," + "\"batchSize\":20480," + "\"limit\":3," + "\"filter\":null," + "\"columns\":[\"market\",\"quality\",\"index\"]," + + "\"legacy\":null," + "\"context\":null," + "\"descending\":false}"; ScanQuery query = new ScanQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")), + VirtualColumns.EMPTY, null, 0, 3, null, Arrays.asList("market", "quality", "index"), + null, null ); diff --git a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java index 8a982601fec5..30c843814567 100644 --- a/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java +++ b/server/src/main/java/io/druid/guice/QueryRunnerFactoryModule.java @@ -32,6 +32,8 @@ import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.query.metadata.SegmentMetadataQueryRunnerFactory; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.scan.ScanQuery; +import io.druid.query.scan.ScanQueryRunnerFactory; import io.druid.query.search.SearchQueryRunnerFactory; import io.druid.query.search.search.SearchQuery; import io.druid.query.select.SelectQuery; @@ -57,6 +59,7 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule .put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class) .put(SegmentMetadataQuery.class, SegmentMetadataQueryRunnerFactory.class) .put(GroupByQuery.class, GroupByQueryRunnerFactory.class) + .put(ScanQuery.class, ScanQueryRunnerFactory.class) .put(SelectQuery.class, SelectQueryRunnerFactory.class) .put(TopNQuery.class, TopNQueryRunnerFactory.class) .put(DataSourceMetadataQuery.class, DataSourceMetadataQueryRunnerFactory.class) diff --git a/server/src/main/java/io/druid/guice/QueryToolChestModule.java b/server/src/main/java/io/druid/guice/QueryToolChestModule.java index 3e4da18bb4d6..5584a147707d 100644 --- a/server/src/main/java/io/druid/guice/QueryToolChestModule.java +++ b/server/src/main/java/io/druid/guice/QueryToolChestModule.java @@ -40,6 +40,9 @@ import io.druid.query.metadata.SegmentMetadataQueryConfig; import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.scan.ScanQuery; +import io.druid.query.scan.ScanQueryConfig; +import io.druid.query.scan.ScanQueryQueryToolChest; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.search.SearchQuery; import io.druid.query.search.search.SearchQueryConfig; @@ -76,6 +79,7 @@ public class QueryToolChestModule implements Module .put(TimeBoundaryQuery.class, TimeBoundaryQueryQueryToolChest.class) .put(SegmentMetadataQuery.class, SegmentMetadataQueryQueryToolChest.class) .put(GroupByQuery.class, GroupByQueryQueryToolChest.class) + .put(ScanQuery.class, ScanQueryQueryToolChest.class) .put(SelectQuery.class, SelectQueryQueryToolChest.class) .put(TopNQuery.class, TopNQueryQueryToolChest.class) .put(DataSourceMetadataQuery.class, DataSourceQueryQueryToolChest.class) @@ -98,6 +102,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.segmentMetadata", SegmentMetadataQueryConfig.class); JsonConfigProvider.bind(binder, "druid.query.select", SelectQueryConfig.class); + JsonConfigProvider.bind(binder, "druid.query.scan", ScanQueryConfig.class); PolyBind.createChoice( binder, diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java index c69326ef72df..f4c6f3de3937 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryBuilder.java @@ -21,9 +21,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import io.druid.java.util.common.ISE; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; @@ -38,6 +40,7 @@ import io.druid.query.groupby.having.DimFilterHavingSpec; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; +import io.druid.query.scan.ScanQuery; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; import io.druid.query.timeseries.TimeseriesQuery; @@ -341,7 +344,7 @@ public RelTrait[] getRelTraits() /** * Return this query as some kind of Druid query. The returned query will either be {@link TopNQuery}, - * {@link TimeseriesQuery}, {@link GroupByQuery}, or {@link SelectQuery}. + * {@link TimeseriesQuery}, {@link GroupByQuery}, {@link ScanQuery}, or {@link SelectQuery}. * * @param dataSource data source to query * @param plannerContext planner context @@ -380,6 +383,11 @@ public Query toQuery( return groupByQuery; } + final ScanQuery scanQuery = toScanQuery(dataSource, plannerContext); + if (scanQuery != null) { + return scanQuery; + } + final SelectQuery selectQuery = toSelectQuery(dataSource, plannerContext); if (selectQuery != null) { return selectQuery; @@ -583,6 +591,56 @@ public GroupByQuery toGroupByQuery( ); } + /** + * Return this query as a Scan query, or null if this query is not compatible with Scan. + * + * @param dataSource data source to query + * @param plannerContext planner context + * + * @return query or null + */ + @Nullable + public ScanQuery toScanQuery( + final DataSource dataSource, + final PlannerContext plannerContext + ) + { + if (grouping != null) { + // Scan cannot GROUP BY. + return null; + } + + if (limitSpec != null && limitSpec.getColumns().size() > 0) { + // Scan cannot ORDER BY. + return null; + } + + if (getRowOrder().isEmpty()) { + // Should never do a scan query without any columns that we're interested in. This is probably a planner bug. + throw new ISE("WTF?! Attempting to convert to Scan query without any columns?"); + } + + final Filtration filtration = Filtration.create(filter).optimize(sourceRowSignature); + + // DefaultLimitSpec (which we use to "remember" limits) is int typed, and Integer.MAX_VALUE means "no limit". + final long scanLimit = limitSpec == null || limitSpec.getLimit() == Integer.MAX_VALUE + ? 0L + : (long) limitSpec.getLimit(); + + return new ScanQuery( + dataSource, + filtration.getQuerySegmentSpec(), + selectProjection != null ? VirtualColumns.create(selectProjection.getVirtualColumns()) : VirtualColumns.EMPTY, + ScanQuery.RESULT_FORMAT_COMPACTED_LIST, + 0, + scanLimit, + filtration.getDimFilter(), + Ordering.natural().sortedCopy(ImmutableSet.copyOf(getRowOrder())), + false, + plannerContext.getQueryContext() + ); + } + /** * Return this query as a Select query, or null if this query is not compatible with Select. * diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index 57ce93a33103..6a4bac009bf6 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import io.druid.data.input.Row; import io.druid.java.util.common.DateTimes; @@ -35,6 +36,7 @@ import io.druid.query.Query; import io.druid.query.Result; import io.druid.query.groupby.GroupByQuery; +import io.druid.query.scan.ScanQuery; import io.druid.query.select.EventHolder; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; @@ -104,6 +106,8 @@ public Sequence runQuery( return executeTopN(queryBuilder, (TopNQuery) query); } else if (query instanceof GroupByQuery) { return executeGroupBy(queryBuilder, (GroupByQuery) query); + } else if (query instanceof ScanQuery) { + return executeScan(queryBuilder, (ScanQuery) query); } else if (query instanceof SelectQuery) { return executeSelect(queryBuilder, (SelectQuery) query); } else { @@ -111,6 +115,50 @@ public Sequence runQuery( } } + private Sequence executeScan( + final DruidQueryBuilder queryBuilder, + final ScanQuery query + ) + { + final List fieldList = queryBuilder.getRowType().getFieldList(); + + // SQL row column index -> Scan query column index + final int[] columnMapping = new int[queryBuilder.getRowOrder().size()]; + final Map scanColumnOrder = Maps.newHashMap(); + + for (int i = 0; i < query.getColumns().size(); i++) { + scanColumnOrder.put(query.getColumns().get(i), i); + } + + for (int i = 0; i < queryBuilder.getRowOrder().size(); i++) { + final Integer index = scanColumnOrder.get(queryBuilder.getRowOrder().get(i)); + columnMapping[i] = index == null ? -1 : index; + } + + return Sequences.concat( + Sequences.map( + runQuery(query), + scanResult -> { + final List retVals = new ArrayList<>(); + final List> rows = (List>) scanResult.getEvents(); + + for (List row : rows) { + final Object[] retVal = new Object[fieldList.size()]; + for (RelDataTypeField field : fieldList) { + retVal[field.getIndex()] = coerce( + row.get(columnMapping[field.getIndex()]), + field.getType().getSqlTypeName() + ); + } + retVals.add(retVal); + } + + return Sequences.simple(retVals); + } + ) + ); + } + private Sequence executeSelect( final DruidQueryBuilder queryBuilder, final SelectQuery baseQuery @@ -169,7 +217,6 @@ public Sequence apply(final Result result) pagingIdentifiers.set(result.getValue().getPagingIdentifiers()); final List retVals = new ArrayList<>(); - for (EventHolder holder : result.getValue().getEvents()) { morePages.set(true); final Map map = holder.getEvent(); diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java index 22f6bdb33a9f..0189935754d3 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java @@ -76,7 +76,7 @@ public void onMatch(RelOptRuleCall call) final Project project = call.rel(0); final DruidRel druidRel = call.rel(1); - // Only push in projections that can be used by the Select query. + // Only push in projections that can be used by the Scan or Select queries. // Leave anything more complicated to DruidAggregateProjectRule for possible handling in a GroupBy query. final RowSignature sourceRowSignature = druidRel.getSourceRowSignature(); diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index e7046e066a4d..52975b80cf67 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -75,6 +75,7 @@ import io.druid.query.lookup.RegisteredLookupExtractionFn; import io.druid.query.ordering.StringComparator; import io.druid.query.ordering.StringComparators; +import io.druid.query.scan.ScanQuery; import io.druid.query.select.PagingSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.QuerySegmentSpec; @@ -137,14 +138,6 @@ public boolean isUseApproximateCountDistinct() return false; } }; - private static final PlannerConfig PLANNER_CONFIG_SELECT_PAGING = new PlannerConfig() - { - @Override - public int getSelectThreshold() - { - return 2; - } - }; private static final PlannerConfig PLANNER_CONFIG_FALLBACK = new PlannerConfig() { @Override @@ -404,31 +397,14 @@ public void testSelectStar() throws Exception { testQuery( "SELECT * FROM druid.foo", - ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 5), - 1000, - true - ) - ) - .context(QUERY_CONTEXT_DEFAULT) - .build() + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{T("2000-01-01"), 1L, "", "a", 1f, 1.0, HLLCV1.class.getName()}, @@ -469,7 +445,7 @@ public void testExplainSelectStar() throws Exception ImmutableList.of(), ImmutableList.of( new Object[]{ - "DruidQueryRel(query=[{\"queryType\":\"select\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dummy\",\"outputName\":\"dummy\",\"outputType\":\"STRING\"}],\"metrics\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"virtualColumns\":[],\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":0,\"fromNext\":true},\"context\":{\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807}}])\n" + "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807},\"descending\":false}])\n" } ) ); @@ -481,15 +457,14 @@ public void testSelectStarWithLimit() throws Exception testQuery( "SELECT * FROM druid.foo LIMIT 2", ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build() + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") + .limit(2) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{T("2000-01-01"), 1L, "", "a", 1.0f, 1.0, HLLCV1.class.getName()}, @@ -499,7 +474,32 @@ public void testSelectStarWithLimit() throws Exception } @Test - public void testSelectStarWithLimitDescending() throws Exception + public void testSelectWithProjection() throws Exception + { + testQuery( + "SELECT SUBSTRING(dim2, 1, 1) FROM druid.foo LIMIT 2", + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .virtualColumns( + EXPRESSION_VIRTUAL_COLUMN("v0", "substring(\"dim2\", 0, 1)", ValueType.STRING) + ) + .columns("v0") + .limit(2) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"a"}, + new Object[]{""} + ) + ); + } + + @Test + public void testSelectStarWithLimitTimeDescending() throws Exception { testQuery( "SELECT * FROM druid.foo ORDER BY __time DESC LIMIT 2", @@ -523,22 +523,64 @@ public void testSelectStarWithLimitDescending() throws Exception } @Test - public void testSelectSingleColumnTwice() throws Exception + public void testSelectStarWithoutLimitTimeAscending() throws Exception { testQuery( - "SELECT dim2 x, dim2 y FROM druid.foo LIMIT 2", + "SELECT * FROM druid.foo ORDER BY __time", ImmutableList.of( Druids.newSelectQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) .intervals(QSS(Filtration.eternity())) .granularity(Granularities.ALL) - .descending(false) .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("dim2")) + .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) + .descending(false) .pagingSpec(FIRST_PAGING_SPEC) .context(QUERY_CONTEXT_DEFAULT) + .build(), + Druids.newSelectQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .granularity(Granularities.ALL) + .dimensions(ImmutableList.of("dummy")) + .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) + .descending(false) + .pagingSpec( + new PagingSpec( + ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 5), + 1000, + true + ) + ) + .context(QUERY_CONTEXT_DEFAULT) .build() ), + ImmutableList.of( + new Object[]{T("2000-01-01"), 1L, "", "a", 1f, 1.0, HLLCV1.class.getName()}, + new Object[]{T("2000-01-02"), 1L, "10.1", "", 2f, 2.0, HLLCV1.class.getName()}, + new Object[]{T("2000-01-03"), 1L, "2", "", 3f, 3.0, HLLCV1.class.getName()}, + new Object[]{T("2001-01-01"), 1L, "1", "a", 4f, 4.0, HLLCV1.class.getName()}, + new Object[]{T("2001-01-02"), 1L, "def", "abc", 5f, 5.0, HLLCV1.class.getName()}, + new Object[]{T("2001-01-03"), 1L, "abc", "", 6f, 6.0, HLLCV1.class.getName()} + ) + ); + } + + @Test + public void testSelectSingleColumnTwice() throws Exception + { + testQuery( + "SELECT dim2 x, dim2 y FROM druid.foo LIMIT 2", + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .columns("dim2") + .limit(2) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), ImmutableList.of( new Object[]{"a", "a"}, new Object[]{"", ""} @@ -619,57 +661,22 @@ public void testSelfJoinWithFallback() throws Exception + " druid.foo x INNER JOIN druid.foo y ON x.dim1 = y.dim2\n" + "WHERE\n" + " x.dim1 <> ''", - ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 5), - 1000, - true - ) - ) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .filters(NOT(SELECTOR("dim1", "", null))) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .filters(NOT(SELECTOR("dim1", "", null))) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 4), - 1000, - true - ) - ) - .context(QUERY_CONTEXT_DEFAULT) - .build() + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build(), + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .filters(NOT(SELECTOR("dim1", "", null))) + .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{"abc", "def", "abc"} @@ -693,8 +700,8 @@ public void testExplainSelfJoinWithFallback() throws Exception new Object[]{ "BindableProject(dim1=[$9], dim10=[$2], dim2=[$3])\n" + " BindableJoin(condition=[=($9, $3)], joinType=[inner])\n" - + " DruidQueryRel(query=[{\"queryType\":\"select\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"filter\":null,\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dummy\",\"outputName\":\"dummy\",\"outputType\":\"STRING\"}],\"metrics\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"virtualColumns\":[],\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":0,\"fromNext\":true},\"context\":{\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807}}])\n" - + " DruidQueryRel(query=[{\"queryType\":\"select\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"\",\"extractionFn\":null}},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dummy\",\"outputName\":\"dummy\",\"outputType\":\"STRING\"}],\"metrics\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"virtualColumns\":[],\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":0,\"fromNext\":true},\"context\":{\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807}}])\n" + + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807},\"descending\":false}])\n" + + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"\",\"extractionFn\":null}},\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807},\"descending\":false}])\n" } ) ); @@ -1315,116 +1322,20 @@ public void testSelectStarWithDimFilter() throws Exception { testQuery( "SELECT * FROM druid.foo WHERE dim1 > 'd' OR dim2 = 'a'", - ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .pagingSpec(FIRST_PAGING_SPEC) - .filters( - OR( - BOUND("dim1", "d", null, true, false, null, StringComparators.LEXICOGRAPHIC), - SELECTOR("dim2", "a", null) - ) - ) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 2), - 1000, - true - ) - ) - .filters( - OR( - BOUND("dim1", "d", null, true, false, null, StringComparators.LEXICOGRAPHIC), - SELECTOR("dim2", "a", null) - ) - ) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .context(QUERY_CONTEXT_DEFAULT) - .build() - ), - ImmutableList.of( - new Object[]{T("2000-01-01"), 1L, "", "a", 1.0f, 1d, HLLCV1.class.getName()}, - new Object[]{T("2001-01-01"), 1L, "1", "a", 4.0f, 4d, HLLCV1.class.getName()}, - new Object[]{T("2001-01-02"), 1L, "def", "abc", 5.0f, 5d, HLLCV1.class.getName()} - ) - ); - } - - @Test - public void testSelectStarWithDimFilterAndPaging() throws Exception - { - testQuery( - PLANNER_CONFIG_SELECT_PAGING, - "SELECT * FROM druid.foo WHERE dim1 > 'd' OR dim2 = 'a'", - ImmutableList.of( - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .pagingSpec(new PagingSpec(null, 2, true)) - .filters( - OR( - BOUND("dim1", "d", null, true, false, null, StringComparators.LEXICOGRAPHIC), - SELECTOR("dim2", "a", null) - ) - ) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 1), - 2, - true - ) - ) - .filters( - OR( - BOUND("dim1", "d", null, true, false, null, StringComparators.LEXICOGRAPHIC), - SELECTOR("dim2", "a", null) - ) - ) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 2), - 2, - true - ) - ) - .filters( - OR( - BOUND("dim1", "d", null, true, false, null, StringComparators.LEXICOGRAPHIC), - SELECTOR("dim2", "a", null) - ) - ) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")) - .context(QUERY_CONTEXT_DEFAULT) - .build() + ImmutableList.of( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .filters( + OR( + BOUND("dim1", "d", null, true, false, null, StringComparators.LEXICOGRAPHIC), + SELECTOR("dim2", "a", null) + ) + ) + .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{T("2000-01-01"), 1L, "", "a", 1.0f, 1.0d, HLLCV1.class.getName()}, @@ -5458,42 +5369,14 @@ public void testUsingSubqueryAsFilterOnTwoColumns() throws Exception .setHavingSpec(new DimFilterHavingSpec(NUMERIC_SELECTOR("a0", "1", null))) .setContext(QUERY_CONTEXT_DEFAULT) .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .dimensionSpecs(DIMS( - new DefaultDimensionSpec("dim1", "d1"), - new DefaultDimensionSpec("dim2", "d2") - )) - .metrics(ImmutableList.of("cnt")) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .filters(AND(SELECTOR("dim1", "def", null), SELECTOR("dim2", "abc", null))) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2")) - .pagingSpec(FIRST_PAGING_SPEC) - .context(QUERY_CONTEXT_DEFAULT) - .build(), - Druids.newSelectQueryBuilder() - .dataSource(CalciteTests.DATASOURCE1) - .dimensionSpecs(DIMS( - new DefaultDimensionSpec("dim1", "d1"), - new DefaultDimensionSpec("dim2", "d2") - )) - .metrics(ImmutableList.of("cnt")) - .intervals(QSS(Filtration.eternity())) - .granularity(Granularities.ALL) - .filters(AND(SELECTOR("dim1", "def", null), SELECTOR("dim2", "abc", null))) - .dimensions(ImmutableList.of("dummy")) - .metrics(ImmutableList.of("__time", "cnt", "dim1", "dim2")) - .pagingSpec( - new PagingSpec( - ImmutableMap.of("foo_1970-01-01T00:00:00.000Z_2001-01-03T00:00:00.001Z_1", 0), - 1000, - true - ) - ) - .context(QUERY_CONTEXT_DEFAULT) - .build() + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .filters(AND(SELECTOR("dim1", "def", null), SELECTOR("dim2", "abc", null))) + .columns("__time", "cnt", "dim1", "dim2") + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() ), ImmutableList.of( new Object[]{T("2001-01-02"), 1L, "def", "abc"} @@ -5813,4 +5696,9 @@ private static ExpressionPostAggregator EXPRESSION_POST_AGG(final String name, f { return new ExpressionPostAggregator(name, expression, null, CalciteTests.createExprMacroTable()); } + + private static ScanQuery.ScanQueryBuilder newScanQueryBuilder() + { + return new ScanQuery.ScanQueryBuilder().legacy(false); + } } diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 27908c311e0b..5e73d86e2203 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -67,6 +67,11 @@ import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest; import io.druid.query.metadata.SegmentMetadataQueryRunnerFactory; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.scan.ScanQuery; +import io.druid.query.scan.ScanQueryConfig; +import io.druid.query.scan.ScanQueryEngine; +import io.druid.query.scan.ScanQueryQueryToolChest; +import io.druid.query.scan.ScanQueryRunnerFactory; import io.druid.query.select.SelectQuery; import io.druid.query.select.SelectQueryConfig; import io.druid.query.select.SelectQueryEngine; @@ -156,6 +161,16 @@ public void configure(final Binder binder) QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ) + .put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory(TestHelper.getJsonMapper()) + ), + new ScanQueryEngine() + ) + ) .put( SelectQuery.class, new SelectQueryRunnerFactory(