diff --git a/distribution/pom.xml b/distribution/pom.xml index 8908e0332a4d..044728d62911 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -104,6 +104,8 @@ io.druid.extensions:mysql-metadata-storage -c io.druid.extensions:postgresql-metadata-storage + -c + io.druid.extensions.contrib:scan-query ${druid.distribution.pulldeps.opts} diff --git a/docs/content/development/extensions-contrib/scan-query.md b/docs/content/development/extensions-contrib/scan-query.md new file mode 100644 index 000000000000..3eef162b254e --- /dev/null +++ b/docs/content/development/extensions-contrib/scan-query.md @@ -0,0 +1,157 @@ +--- +layout: doc_page +--- + +# Scan query +Scan query returns raw Druid rows in streaming mode. + +```json + { + "queryType": "scan", + "dataSource": "wikipedia", + "resultFormat": "list", + "columns":[], + "intervals": [ + "2013-01-01/2013-01-02" + ], + "batchSize":20480, + "limit":5 + } +``` + +There are several main parts to a scan query: + +|property|description|required?| +|--------|-----------|---------| +|queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes| +|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes| +|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| +|resultFormat|How result represented, list or compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no| +|filter|See [Filters](../querying/filters.html)|no| +|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| +|batchSize|How many rows buffered before return to client. Default is `20480`|no| +|limit|How many rows to return. If not specified, all rows will be returned.|no| +|context|An additional JSON Object which can be used to specify certain flags.|no| + +The format of the result when resultFormat equals to `list`: + +```json + [{ + "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", + "columns" : [ + "timestamp", + "robot", + "namespace", + "anonymous", + "unpatrolled", + "page", + "language", + "newpage", + "user", + "count", + "added", + "delta", + "variation", + "deleted" + ], + "events" : [ { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "1", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "11._korpus_(NOVJ)", + "language" : "sl", + "newpage" : "0", + "user" : "EmausBot", + "count" : 1.0, + "added" : 39.0, + "delta" : 39.0, + "variation" : 39.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "112_U.S._580", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 70.0, + "delta" : 70.0, + "variation" : 70.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "113_U.S._243", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 77.0, + "delta" : 77.0, + "variation" : 77.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "113_U.S._73", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 70.0, + "delta" : 70.0, + "variation" : 70.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "113_U.S._756", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 68.0, + "delta" : 68.0, + "variation" : 68.0, + "deleted" : 0.0 + } ] +} ] +``` + +The format of the result when resultFormat equals to `compactedList`: + +```json + [{ + "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", + "columns" : [ + "timestamp", "robot", "namespace", "anonymous", "unpatrolled", "page", "language", "newpage", "user", "count", "added", "delta", "variation", "deleted" + ], + "events" : [ + ["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._73", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._756", "en", "1", "MZMcBride", 1.0, 68.0, 68.0, 68.0, 0.0] + ] +} ] +``` + +The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client. +It will cause memory pressure if too many rows required by select query. +Scan query doesn't have this issue. +Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly. \ No newline at end of file diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 5813ccd78dcb..6a2e996ffd44 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -65,6 +65,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| +|scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)| ## Promoting Community Extension to Core Extension diff --git a/extensions-contrib/scan-query/pom.xml b/extensions-contrib/scan-query/pom.xml new file mode 100644 index 000000000000..183ddbf68da3 --- /dev/null +++ b/extensions-contrib/scan-query/pom.xml @@ -0,0 +1,58 @@ + + + + + + + io.druid + druid + 0.9.3-SNAPSHOT + ../../pom.xml + + 4.0.0 + + io.druid.extensions.contrib + scan-query + scan-query + streaming version of select query + + + + io.druid + druid-server + ${project.parent.version} + + + junit + junit + test + + + io.druid + druid-processing + ${project.parent.version} + tests + test + + + + diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java new file mode 100644 index 000000000000..cdd2d1da4c13 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -0,0 +1,390 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.druid.query.BaseQuery; +import io.druid.query.DataSource; +import io.druid.query.Query; +import io.druid.query.TableDataSource; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.InDimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.spec.LegacySegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.Interval; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@JsonTypeName("scan") +public class ScanQuery extends BaseQuery +{ + public static final String SCAN = "scan"; + public static final String RESULT_FORMAT_LIST = "list"; + public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList"; + public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector"; + + private final String resultFormat; + private final int batchSize; + private final int limit; + private final DimFilter dimFilter; + private final List columns; + + @JsonCreator + public ScanQuery( + @JsonProperty("dataSource") DataSource dataSource, + @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("resultFormat") String resultFormat, + @JsonProperty("batchSize") int batchSize, + @JsonProperty("limit") int limit, + @JsonProperty("filter") DimFilter dimFilter, + @JsonProperty("columns") List columns, + @JsonProperty("context") Map context + ) + { + super(dataSource, querySegmentSpec, false, context); + this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat; + this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; + this.limit = (limit == 0) ? Integer.MAX_VALUE : limit; + Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0"); + Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0"); + this.dimFilter = dimFilter; + this.columns = columns; + } + + @JsonProperty + public String getResultFormat() + { + return resultFormat; + } + + @JsonProperty + public int getBatchSize() + { + return batchSize; + } + + @JsonProperty + public int getLimit() + { + return limit; + } + + @Override + public boolean hasFilters() + { + return dimFilter != null; + } + + @Override + public DimFilter getFilter() + { + return dimFilter; + } + + @Override + public String getType() + { + return SCAN; + } + + @JsonProperty("filter") + public DimFilter getDimensionsFilter() + { + return dimFilter; + } + + @JsonProperty + public List getColumns() + { + return columns; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) + { + return new ScanQuery( + getDataSource(), + querySegmentSpec, + resultFormat, + batchSize, + limit, + dimFilter, + columns, + getContext() + ); + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return new ScanQuery( + dataSource, + getQuerySegmentSpec(), + resultFormat, + batchSize, + limit, + dimFilter, + columns, + getContext() + ); + } + + @Override + public Query withOverriddenContext(Map contextOverrides) + { + return new ScanQuery( + getDataSource(), + getQuerySegmentSpec(), + resultFormat, + batchSize, + limit, + dimFilter, + columns, + computeOverridenContext(contextOverrides) + ); + } + + public ScanQuery withDimFilter(DimFilter dimFilter) + { + return new ScanQuery( + getDataSource(), + getQuerySegmentSpec(), + resultFormat, + batchSize, + limit, + dimFilter, + columns, + getContext() + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + ScanQuery that = (ScanQuery) o; + + if (batchSize != that.batchSize) { + return false; + } + if (limit != that.limit) { + return false; + } + if (resultFormat != null ? !resultFormat.equals(that.resultFormat) : that.resultFormat != null) { + return false; + } + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) { + return false; + } + return columns != null ? columns.equals(that.columns) : that.columns == null; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0); + result = 31 * result + batchSize; + result = 31 * result + limit; + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (columns != null ? columns.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "ScanQuery{" + + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + + ", resultFormat='" + resultFormat + '\'' + + ", batchSize=" + batchSize + + ", limit=" + limit + + ", dimFilter=" + dimFilter + + ", columns=" + columns + + '}'; + } + + /** + * A Builder for ScanQuery. + *

+ * Required: dataSource(), intervals() must be called before build() + *

+ * Usage example: + *


+   *   ScanQuery query = new ScanQueryBuilder()
+   *                                  .dataSource("Example")
+   *                                  .interval("2010/2013")
+   *                                  .build();
+   * 
+ * + * @see io.druid.query.scan.ScanQuery + */ + public static class ScanQueryBuilder + { + private DataSource dataSource; + private QuerySegmentSpec querySegmentSpec; + private Map context; + private String resultFormat; + private int batchSize; + private int limit; + private DimFilter dimFilter; + private List columns; + + public ScanQueryBuilder() + { + dataSource = null; + querySegmentSpec = null; + context = null; + resultFormat = null; + batchSize = 0; + limit = 0; + dimFilter = null; + columns = Lists.newArrayList(); + } + + public ScanQuery build() + { + return new ScanQuery( + dataSource, + querySegmentSpec, + resultFormat, + batchSize, + limit, + dimFilter, + columns, + context + ); + } + + public ScanQueryBuilder copy(ScanQueryBuilder builder) + { + return new ScanQueryBuilder() + .dataSource(builder.dataSource) + .intervals(builder.querySegmentSpec) + .context(builder.context); + } + + public ScanQueryBuilder dataSource(String ds) + { + dataSource = new TableDataSource(ds); + return this; + } + + public ScanQueryBuilder dataSource(DataSource ds) + { + dataSource = ds; + return this; + } + + public ScanQueryBuilder intervals(QuerySegmentSpec q) + { + querySegmentSpec = q; + return this; + } + + public ScanQueryBuilder intervals(String s) + { + querySegmentSpec = new LegacySegmentSpec(s); + return this; + } + + public ScanQueryBuilder intervals(List l) + { + querySegmentSpec = new LegacySegmentSpec(l); + return this; + } + + public ScanQueryBuilder context(Map c) + { + context = c; + return this; + } + + public ScanQueryBuilder resultFormat(String r) + { + resultFormat = r; + return this; + } + + public ScanQueryBuilder batchSize(int b) + { + batchSize = b; + return this; + } + + public ScanQueryBuilder limit(int l) + { + limit = l; + return this; + } + + public ScanQueryBuilder filters(String dimensionName, String value) + { + dimFilter = new SelectorDimFilter(dimensionName, value, null); + return this; + } + + public ScanQueryBuilder filters(String dimensionName, String value, String... values) + { + dimFilter = new InDimFilter(dimensionName, Lists.asList(value, values), null); + return this; + } + + public ScanQueryBuilder filters(DimFilter f) + { + dimFilter = f; + return this; + } + + public ScanQueryBuilder columns(List c) + { + columns = c; + return this; + } + + public ScanQueryBuilder columns(String... c) + { + columns = Arrays.asList(c); + return this; + } + } + + public static ScanQueryBuilder newScanQueryBuilder() + { + return new ScanQueryBuilder(); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java new file mode 100644 index 000000000000..f33ea997b268 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.guice.DruidBinders; +import io.druid.guice.LazySingleton; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +public class ScanQueryDruidModule implements DruidModule { + public void configure(Binder binder) { + DruidBinders.queryToolChestBinder(binder) + .addBinding(ScanQuery.class) + .to(ScanQueryQueryToolChest.class) + .in(LazySingleton.class); + + DruidBinders.queryRunnerFactoryBinder(binder) + .addBinding(ScanQuery.class) + .to(ScanQueryRunnerFactory.class) + .in(LazySingleton.class); + } + + public List getJacksonModules() { + return Arrays.asList( + new SimpleModule("ScanQueryDruidModule") + .registerSubtypes( + new NamedType(ScanQuery.class, ScanQuery.SCAN) + ) + ); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java new file mode 100644 index 000000000000..5b04bb19a8d6 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -0,0 +1,244 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.BaseSequence; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.QueryInterruptedException; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.filter.Filter; +import io.druid.query.select.SelectQueryEngine; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionHandlerUtils; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.Segment; +import io.druid.segment.StorageAdapter; +import io.druid.segment.VirtualColumns; +import io.druid.segment.column.Column; +import io.druid.segment.filter.Filters; +import org.joda.time.Interval; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +public class ScanQueryEngine +{ + private static final SelectQueryEngine.SelectStrategyFactory STRATEGY_FACTORY = new SelectQueryEngine.SelectStrategyFactory(); + public Sequence process( + final ScanQuery query, + final Segment segment, + final Map responseContext + ) + { + if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) { + int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + if (count >= query.getLimit()) { + return Sequences.empty(); + } + } + final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT); + final long start = System.currentTimeMillis(); + final StorageAdapter adapter = segment.asStorageAdapter(); + + if (adapter == null) { + throw new ISE( + "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." + ); + } + + List allDims = Lists.newLinkedList(adapter.getAvailableDimensions()); + List allMetrics = Lists.newLinkedList(adapter.getAvailableMetrics()); + final List allColumns = Lists.newLinkedList(); + if (query.getColumns() != null && !query.getColumns().isEmpty()) { + if (!query.getColumns().contains(ScanResultValue.timestampKey)) { + allColumns.add(ScanResultValue.timestampKey); + } + allColumns.addAll(query.getColumns()); + allDims.retainAll(query.getColumns()); + allMetrics.retainAll(query.getColumns()); + } + else { + if (!allDims.contains(ScanResultValue.timestampKey)) { + allColumns.add(ScanResultValue.timestampKey); + } + allColumns.addAll(allDims); + allColumns.addAll(allMetrics); + } + final List dims = DefaultDimensionSpec.toSpec(allDims); + final List metrics = allMetrics; + + final List intervals = query.getQuerySegmentSpec().getIntervals(); + Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals); + + final String segmentId = segment.getIdentifier(); + + final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); + + if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) { + responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0); + } + final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + filter, + intervals.get(0), + VirtualColumns.EMPTY, + QueryGranularities.ALL, + query.isDescending() + ), + new Function>() + { + @Override + public Sequence apply(final Cursor cursor) + { + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + + final List> selectorPlusList = Arrays.asList( + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + Lists.newArrayList(dims), + cursor + ) + ); + + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + metSelectors.put(metric, metricSelector); + } + final int batchSize = query.getBatchSize(); + return new Iterator() + { + private int offset = 0; + + @Override + public boolean hasNext() + { + return !cursor.isDone() && offset < limit; + } + + @Override + public ScanResultValue next() + { + if (System.currentTimeMillis() >= timeoutAt) { + throw new QueryInterruptedException(new TimeoutException()); + } + int lastOffset = offset; + Object events = null; + String resultFormat = query.getResultFormat(); + if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) { + throw new UnsupportedOperationException("valueVector is not supported now"); + } else if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) { + events = rowsToCompactedList(); + } else { + events = rowsToList(); + } + responseContext.put( + ScanQueryRunnerFactory.CTX_COUNT, + (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) + ); + responseContext.put( + ScanQueryRunnerFactory.CTX_TIMEOUT_AT, + timeoutAt - (System.currentTimeMillis() - start) + ); + return new ScanResultValue(segmentId, allColumns, events); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + private Object rowsToCompactedList() + { + return Lists.transform( + (List>) rowsToList(), + new Function, Object>() + { + @Override + public Object apply(Map input) + { + List eventValues = Lists.newArrayListWithExpectedSize(allColumns.size()); + for (String expectedColumn : allColumns) { + eventValues.add(input.get(expectedColumn)); + } + return eventValues; + } + } + ); + } + + private Object rowsToList() + { + List> events = Lists.newArrayListWithCapacity(batchSize); + for (int i = 0; !cursor.isDone() + && i < batchSize + && offset < limit; cursor.advance(), i++, offset++) { + final Map theEvent = SelectQueryEngine.singleEvent( + ScanResultValue.timestampKey, + timestampColumnSelector, + selectorPlusList, + metSelectors + ); + events.add(theEvent); + } + return events; + } + + private Object rowsToValueVector() + { + // only support list now, we can support ValueVector or Arrow in future + return rowsToList(); + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + } + } + ); + } + } + ) + ); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java new file mode 100644 index 000000000000..78a11073f55e --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java @@ -0,0 +1,98 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Yielder; +import io.druid.java.util.common.guava.YieldingAccumulator; +import io.druid.java.util.common.parsers.CloseableIterator; +import io.druid.query.QueryRunner; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ScanQueryLimitRowIterator implements CloseableIterator +{ + private Yielder yielder; + private String resultFormat; + private int limit = 0; + private int count = 0; + + public ScanQueryLimitRowIterator( + QueryRunner baseRunner, ScanQuery query, + Map responseContext + ) + { + resultFormat = query.getResultFormat(); + limit = query.getLimit(); + Sequence baseSequence = baseRunner.run(query, responseContext); + yielder = baseSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasNext() + { + return !yielder.isDone() && count < limit; + } + + @Override + public ScanResultValue next() + { + ScanResultValue batch = yielder.get(); + if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) || + ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) { + List events = (List) batch.getEvents(); + if (events.size() <= limit - count) { + count += events.size(); + yielder = yielder.next(null); + return batch; + } else { + // last batch + int left = limit - count; + count = limit; + return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left)); + } + } + throw new UnsupportedOperationException(ScanQuery.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet"); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException + { + yielder.close(); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java new file mode 100644 index 000000000000..222efda0a8e1 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -0,0 +1,114 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.guava.BaseSequence; +import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.guava.Sequence; +import io.druid.query.DruidMetrics; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChest; +import io.druid.query.aggregation.MetricManipulationFn; + +import java.util.Map; + +public class ScanQueryQueryToolChest extends QueryToolChest +{ + private static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; + + @Override + public QueryRunner mergeResults(final QueryRunner runner) + { + return new QueryRunner() + { + @Override + public Sequence run( + final Query query, final Map responseContext + ) + { + ScanQuery scanQuery = (ScanQuery) query; + if (scanQuery.getLimit() == Integer.MAX_VALUE) { + return runner.run(query, responseContext); + } + return new BaseSequence<>( + new BaseSequence.IteratorMaker() + { + @Override + public ScanQueryLimitRowIterator make() + { + return new ScanQueryLimitRowIterator(runner, (ScanQuery) query, responseContext); + } + + @Override + public void cleanup(ScanQueryLimitRowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + } + ); + } + }; + } + + @Override + public ServiceMetricEvent.Builder makeMetricBuilder(ScanQuery query) + { + return DruidMetrics.makePartialQueryTimeMetric(query); + } + + @Override + public Function makePreComputeManipulatorFn( + ScanQuery query, MetricManipulationFn fn + ) + { + return Functions.identity(); + } + + @Override + public TypeReference getResultTypeReference() + { + return TYPE_REFERENCE; + } + + @Override + public QueryRunner preMergeQueryDecoration(final QueryRunner runner) + { + return new QueryRunner() + { + @Override + public Sequence run( + Query query, Map responseContext + ) + { + ScanQuery scanQuery = (ScanQuery) query; + if (scanQuery.getDimensionsFilter() != null) { + scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize()); + } + return runner.run(scanQuery, responseContext); + } + }; + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java new file mode 100644 index 000000000000..6b1244c5ff8a --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.google.common.base.Function; +import com.google.inject.Inject; +import io.druid.common.utils.JodaUtils; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Query; +import io.druid.query.QueryContextKeys; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.segment.Segment; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class ScanQueryRunnerFactory implements QueryRunnerFactory +{ + public static final String CTX_TIMEOUT_AT = "timeoutAt"; + public static final String CTX_COUNT = "count"; + private final ScanQueryQueryToolChest toolChest; + private final ScanQueryEngine engine; + + @Inject + public ScanQueryRunnerFactory( + ScanQueryQueryToolChest toolChest, + ScanQueryEngine engine + ) + { + this.toolChest = toolChest; + this.engine = engine; + } + + @Override + public QueryRunner createRunner(Segment segment) + { + return new ScanQueryRunner(engine, segment); + } + + @Override + public QueryRunner mergeRunners( + ExecutorService queryExecutor, + final Iterable> queryRunners + ) + { + // in single thread and in jetty thread instead of processing thread + return new QueryRunner() + { + @Override + public Sequence run( + final Query query, final Map responseContext + ) + { + final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); + final long timeoutAt = queryTimeout == null + ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); + responseContext.put(CTX_TIMEOUT_AT, timeoutAt); + return Sequences.concat( + Sequences.map( + Sequences.simple(queryRunners), + new Function, Sequence>() + { + @Override + public Sequence apply(final QueryRunner input) + { + return input.run(query, responseContext); + } + } + ) + ); + } + }; + } + + @Override + public QueryToolChest getToolchest() + { + return toolChest; + } + + private class ScanQueryRunner implements QueryRunner + { + private final ScanQueryEngine engine; + private final Segment segment; + + public ScanQueryRunner(ScanQueryEngine engine, Segment segment) + { + this.engine = engine; + this.segment = segment; + } + + @Override + public Sequence run( + Query query, Map responseContext + ) + { + if (!(query instanceof ScanQuery)) { + throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class); + } + + // it happens in unit tests + if (responseContext.get(CTX_TIMEOUT_AT) == null) { + responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT); + }; + return engine.process((ScanQuery) query, segment, responseContext); + } + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java new file mode 100644 index 000000000000..76d8bf5dba97 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -0,0 +1,115 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class ScanResultValue implements Comparable +{ + public static final String timestampKey = "timestamp"; + + private final String segmentId; + private final List columns; + private final Object events; + + @JsonCreator + public ScanResultValue( + @JsonProperty("segmentId") String segmentId, + @JsonProperty("columns") List columns, + @JsonProperty("events") Object events + ) + { + this.segmentId = segmentId; + this.columns = columns; + this.events = events; + } + + @JsonProperty + public String getSegmentId() + { + return segmentId; + } + + @JsonProperty + public List getColumns() + { + return columns; + } + + @JsonProperty + public Object getEvents() + { + return events; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ScanResultValue that = (ScanResultValue) o; + + if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) { + return false; + } + if (columns != null ? !columns.equals(that.columns) : that.columns != null) { + return false; + } + return events != null ? events.equals(that.events) : that.events == null; + } + + @Override + public int hashCode() + { + int result = segmentId != null ? segmentId.hashCode() : 0; + result = 31 * result + (columns != null ? columns.hashCode() : 0); + result = 31 * result + (events != null ? events.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "ScanResultValue{" + + "segmentId='" + segmentId + '\'' + + ", columns=" + columns + + ", events=" + events + + '}'; + } + + @Override + public int compareTo(ScanResultValue that) + { + if (that == null) { + return 1; + } + if (segmentId != null && that.segmentId != null) { + return segmentId.compareTo(that.segmentId); + } + return segmentId != null ? 1 : 0; + } +} diff --git a/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..1459501bf4a8 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.query.scan.ScanQueryDruidModule \ No newline at end of file diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java new file mode 100644 index 000000000000..f2fbf0f5f41e --- /dev/null +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.CharSource; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.MergeSequence; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.TableDataSource; +import io.druid.query.select.SelectQueryRunnerTest; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.TestIndex; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +@RunWith(Parameterized.class) +public class MultiSegmentScanQueryTest +{ + private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(); + + private static final QueryRunnerFactory factory = new ScanQueryRunnerFactory( + toolChest, + new ScanQueryEngine() + ); + + // time modified version of druid.sample.tsv + public static final String[] V_0112 = { + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T01:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T02:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T03:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T04:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T05:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T06:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T07:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T08:00:00.000Z spot travel preferred tpreferred 100.000000", + "2011-01-12T09:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T10:00:00.000Z total_market premium preferred ppreferred 1000.000000", + "2011-01-12T11:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value", + "2011-01-12T12:00:00.000Z upfront premium preferred ppreferred 800.000000 value", + "2011-01-12T13:00:00.000Z upfront premium preferred ppreferred2 800.000000 value" + }; + public static final String[] V_0113 = { + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T01:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T02:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T03:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T04:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T05:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T06:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T07:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T08:00:00.000Z spot travel preferred tpreferred 106.236928", + "2011-01-13T09:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T10:00:00.000Z total_market premium preferred ppreferred 1689.012875", + "2011-01-13T11:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value", + "2011-01-13T12:00:00.000Z upfront premium preferred ppreferred 1564.617729 value" + }; + + private static Segment segment0; + private static Segment segment1; + + @BeforeClass + public static void setup() throws IOException + { + CharSource v_0112 = CharSource.wrap(StringUtils.join(V_0112, "\n")); + CharSource v_0113 = CharSource.wrap(StringUtils.join(V_0113, "\n")); + + IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIndex("2011-01-12T00:00:00.000Z"), v_0112); + IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIndex("2011-01-13T00:00:00.000Z"), v_0113); + + segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1")); + segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1")); + } + + private static String makeIdentifier(IncrementalIndex index, String version) + { + return makeIdentifier(index.getInterval(), version); + } + + private static String makeIdentifier(Interval interval, String version) + { + return DataSegment.makeDataSegmentIdentifier( + QueryRunnerTestHelper.dataSource, + interval.getStart(), + interval.getEnd(), + version, + NoneShardSpec.instance() + ); + } + + private static IncrementalIndex newIndex(String minTimeStamp) + { + return newIndex(minTimeStamp, 10000); + } + + private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount) + { + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime(minTimeStamp).getMillis()) + .withQueryGranularity(QueryGranularities.HOUR) + .withMetrics(TestIndex.METRIC_AGGS) + .build(); + return new OnheapIncrementalIndex(schema, true, maxRowCount); + } + + @AfterClass + public static void clear() + { + IOUtils.closeQuietly(segment0); + IOUtils.closeQuietly(segment1); + } + + @Parameterized.Parameters(name = "limit={0},batchSize={1}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian(Arrays.asList(0, 1, 3, 7, 10, 20, 1000), Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000)); + } + + private final int limit; + private final int batchSize; + + public MultiSegmentScanQueryTest(int limit, int batchSize) + { + this.limit = limit; + this.batchSize = batchSize; + } + + private ScanQuery.ScanQueryBuilder newBuilder() + { + return ScanQuery.newScanQueryBuilder() + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .intervals(SelectQueryRunnerTest.I_0112_0114) + .batchSize(batchSize) + .columns(Arrays.asList()) + .limit(limit); + } + + @Test + public void testMergeRunnersWithLimit() + { + ScanQuery query = newBuilder().build(); + List results = Sequences.toList( + factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of( + factory.createRunner(segment0), + factory.createRunner(segment1) + )).run(query, new HashMap()), + Lists.newArrayList() + ); + int totalCount = 0; + for (ScanResultValue result : results) { + System.out.println(((List) result.getEvents()).size()); + totalCount += ((List) result.getEvents()).size(); + } + Assert.assertEquals( + totalCount, + limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length + ); + } + + @Test + public void testMergeResultsWithLimit() + { + QueryRunner runner = toolChest.mergeResults( + new QueryRunner() { + @Override + public Sequence run( + Query query, Map responseContext + ) + { + // simulate results back from 2 historicals + List> sequences = Lists.newArrayListWithExpectedSize(2); + sequences.add(factory.createRunner(segment0).run(query, new HashMap())); + sequences.add(factory.createRunner(segment1).run(query, new HashMap())); + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple(sequences) + ); + } + } + ); + ScanQuery query = newBuilder().build(); + List results = Sequences.toList( + runner.run(query, new HashMap()), + Lists.newArrayList() + ); + int totalCount = 0; + for (ScanResultValue result : results) { + totalCount += ((List) result.getEvents()).size(); + } + Assert.assertEquals( + totalCount, + limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length + ); + } +} diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java new file mode 100644 index 000000000000..e5277256c5e2 --- /dev/null +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java @@ -0,0 +1,605 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ObjectArrays; +import com.google.common.collect.Sets; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.TableDataSource; +import io.druid.query.extraction.MapLookupExtractor; +import io.druid.query.filter.AndDimFilter; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.lookup.LookupExtractionFn; +import io.druid.query.spec.LegacySegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +@RunWith(Parameterized.class) +public class ScanQueryRunnerTest +{ + // copied from druid.sample.tsv + public static final String[] V_0112 = { + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000", + "2011-01-12T00:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value", + "2011-01-12T00:00:00.000Z upfront premium preferred ppreferred 800.000000 value" + }; + public static final String[] V_0113 = { + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928", + "2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875", + "2011-01-13T00:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value", + "2011-01-13T00:00:00.000Z upfront premium preferred ppreferred 1564.617729 value" + }; + + public static final QuerySegmentSpec I_0112_0114 = new LegacySegmentSpec( + new Interval("2011-01-12/2011-01-14") + ); + public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class); + + private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(); + + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian( + QueryRunnerTestHelper.makeQueryRunners( + new ScanQueryRunnerFactory( + toolChest, + new ScanQueryEngine() + ) + ) + ); + } + + private final QueryRunner runner; + + public ScanQueryRunnerTest(QueryRunner runner) + { + this.runner = runner; + } + + private ScanQuery.ScanQueryBuilder newTestQuery() + { + return ScanQuery.newScanQueryBuilder() + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .columns(Arrays.asList()) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .limit(3); + } + + @Test + public void testFullOnSelect() + { + List columns = Lists.newArrayList( + ScanResultValue.timestampKey, + "market", + "quality", + "placement", + "placementish", + "partial_null_column", + "null_column", + "index", + "indexMin", + "indexMaxPlusTen", + "quality_uniques" + ); + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toFullEvents(V_0112_0114), + columns, + 0, + 3 + ); + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); + } + + @Test + public void testFullOnSelectAsCompactedList() + { + final List columns = Lists.newArrayList( + ScanResultValue.timestampKey, + "market", + "quality", + "placement", + "placementish", + "partial_null_column", + "null_column", + "index", + "indexMin", + "indexMaxPlusTen", + "quality_uniques" + ); + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toFullEvents(V_0112_0114), + columns, + 0, + 3 + ); + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(compactedListToRow(results), "null_column")); + } + + @Test + public void testSelectWithDimsAndMets() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + null, + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + V_0112_0114 + ), + Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"), + 0, + 3 + ); + verify(expectedResults, results); + } + + @Test + public void testSelectWithDimsAndMetsAsCompactedList() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + null, + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + V_0112_0114 + ), + Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"), + 0, + 3 + ); + verify(expectedResults, compactedListToRow(results)); + } + + @Test + public void testFullOnSelectWithFilterAndLimit() + { + // limits + for (int limit : new int[]{3, 1, 5, 7, 0}) { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) + .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .limit(limit) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + null, + QueryRunnerTestHelper.qualityDimension + ":STRING", + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + // filtered values with day granularity + new String[]{ + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000" + }, + new String[]{ + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928" + } + ); + + List expectedResults = toExpected( + events, + Lists.newArrayList(ScanResultValue.timestampKey, "quality", "index"), + 0, + limit + ); + verify(expectedResults, results); + } + } + + @Test + public void testSelectWithFilterLookupExtractionFn() + { + Map extractionMap = new HashMap<>(); + extractionMap.put("total_market", "replaced"); + MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false); + LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true); + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "replaced", lookupExtractionFn)) + .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .build(); + + Iterable results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.newArrayList() + ); + Iterable resultsOptimize = Sequences.toList( + toolChest.postMergeQueryDecoration(toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))). + run(query, Maps.newHashMap()), Lists.newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + null, + QueryRunnerTestHelper.qualityDimension + ":STRING", + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + // filtered values with day granularity + new String[]{ + "2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000" + }, + new String[]{ + "2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875" + } + ); + + List expectedResults = toExpected( + events, + Lists.newArrayList(ScanResultValue.timestampKey, QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric), + 0, + 3 + ); + + verify(expectedResults, results); + verify(expectedResults, resultsOptimize); + } + + @Test + public void testFullSelectNoResults() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters( + new AndDimFilter( + Arrays.asList( + new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null), + new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "foo", null) + ) + ) + ) + .build(); + + Iterable results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.newArrayList() + ); + + List expectedResults = Arrays.asList( + ); + + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); + } + + @Test + public void testFullSelectNoDimensionAndMetric() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns("foo", "foo2") + .build(); + + Iterable results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME" + }, + V_0112_0114 + ); + + List expectedResults = toExpected( + events, + Lists.newArrayList(ScanResultValue.timestampKey, "foo", "foo2"), + 0, + 3 + ); + verify(expectedResults, results); + } + + private List>> toFullEvents(final String[]... valueSet) + { + return toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + QueryRunnerTestHelper.qualityDimension + ":STRING", + QueryRunnerTestHelper.placementDimension + ":STRING", + QueryRunnerTestHelper.placementishDimension + ":STRINGS", + QueryRunnerTestHelper.indexMetric + ":FLOAT", + QueryRunnerTestHelper.partialNullDimension + ":STRING" + }, + valueSet + ); + } + + private List>> toEvents(final String[] dimSpecs, final String[]... valueSet) + { + List values = Lists.newArrayList(); + for (String[] vSet : valueSet) { + values.addAll(Arrays.asList(vSet)); + } + List>> events = Lists.newArrayList(); + events.add( + Lists.newArrayList( + Iterables.transform( + values, new Function>() + { + @Override + public Map apply(String input) + { + Map event = Maps.newHashMap(); + String[] values = input.split("\\t"); + for (int i = 0; i < dimSpecs.length; i++) { + if (dimSpecs[i] == null || i >= dimSpecs.length || i >= values.length) { + continue; + } + String[] specs = dimSpecs[i].split(":"); + event.put( + specs[0], + specs.length == 1 || specs[1].equals("STRING") ? values[i] : + specs[1].equals("TIME") ? new DateTime(values[i]) : + specs[1].equals("FLOAT") ? Float.valueOf(values[i]) : + specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) : + specs[1].equals("LONG") ? Long.valueOf(values[i]) : + specs[1].equals("NULL") ? null : + specs[1].equals("STRINGS") ? Arrays.asList(values[i].split("\u0001")) : + values[i] + ); + } + return event; + } + } + ) + ) + ); + return events; + } + + private List toExpected( + List>> targets, + List columns, + final int offset, + final int limit + ) + { + List expected = Lists.newArrayListWithExpectedSize(targets.size()); + for (List> group : targets) { + List> events = Lists.newArrayListWithExpectedSize(limit); + int end = Math.min(group.size(), offset + limit); + if (end == 0) { + end = group.size(); + } + events.addAll(group.subList(offset, end)); + expected.add( + new ScanResultValue( + QueryRunnerTestHelper.segmentId, + columns, + events + ) + ); + } + return expected; + } + + private static void verify( + Iterable expectedResults, + Iterable actualResults + ) + { + Iterator expectedIter = expectedResults.iterator(); + Iterator actualIter = actualResults.iterator(); + + while (expectedIter.hasNext()) { + ScanResultValue expected = expectedIter.next(); + ScanResultValue actual = actualIter.next(); + + Assert.assertEquals(expected.getSegmentId(), actual.getSegmentId()); + + Set exColumns = Sets.newTreeSet(expected.getColumns()); + Set acColumns = Sets.newTreeSet(actual.getColumns()); + Assert.assertEquals(exColumns, acColumns); + + Iterator> expectedEvts = ((List>) expected.getEvents()).iterator(); + Iterator> actualEvts = ((List>) actual.getEvents()).iterator(); + + while (expectedEvts.hasNext()) { + Map exHolder = expectedEvts.next(); + Map acHolder = actualEvts.next(); + + for (Map.Entry ex : exHolder.entrySet()) { + Object actVal = acHolder.get(ex.getKey()); + + // work around for current II limitations + if (acHolder.get(ex.getKey()) instanceof Double) { + actVal = ((Double) actVal).floatValue(); + } + Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal); + } + } + + if (actualEvts.hasNext()) { + throw new ISE("This event iterator should be exhausted!"); + } + } + + if (actualIter.hasNext()) { + throw new ISE("This iterator should be exhausted!"); + } + } + + private static Iterable populateNullColumnAtLastForQueryableIndexCase( + Iterable results, + String columnName + ) + { + // A Queryable index does not have the null column when it has loaded a index. + for (ScanResultValue value : results) { + List columns = value.getColumns(); + if (columns.contains(columnName)) { + break; + } + columns.add(columnName); + } + + return results; + } + + private Iterable compactedListToRow(Iterable results) { + return Iterables.transform(results, new Function() + { + @Override + public ScanResultValue apply(ScanResultValue input) + { + List mapEvents = Lists.newLinkedList(); + List events = ((List) input.getEvents()); + for (int i = 0; i < events.size(); i++) { + Iterator compactedEventIter = ((List) events.get(i)).iterator(); + Map mapEvent = new LinkedHashMap(); + for (String column : input.getColumns()) { + mapEvent.put(column, compactedEventIter.next()); + } + mapEvents.add(mapEvent); + } + return new ScanResultValue(input.getSegmentId(), input.getColumns(), mapEvents); + } + }); + } +} diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java new file mode 100644 index 000000000000..64d92b71547f --- /dev/null +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.TableDataSource; +import io.druid.query.spec.LegacySegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class ScanQuerySpecTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testSerializationLegacyString() throws Exception + { + String legacy = + "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]}," + + "\"filter\":null," + + "\"columns\":[\"market\",\"quality\",\"index\"]," + + "\"limit\":3," + + "\"context\":null}"; + + String current = + "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]}," + + "\"resultFormat\":\"list\"," + + "\"batchSize\":20480," + + "\"limit\":3," + + "\"filter\":null," + + "\"columns\":[\"market\",\"quality\",\"index\"]," + + "\"context\":null," + + "\"descending\":false}"; + + ScanQuery query = new ScanQuery( + new TableDataSource(QueryRunnerTestHelper.dataSource), + new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), + null, + 0, + 3, + null, + Arrays.asList("market", "quality", "index"), + null + ); + + String actual = jsonMapper.writeValueAsString(query); + Assert.assertEquals(current, actual); + Assert.assertEquals(query, jsonMapper.readValue(actual, ScanQuery.class)); + Assert.assertEquals(query, jsonMapper.readValue(legacy, ScanQuery.class)); + } +} diff --git a/pom.xml b/pom.xml index 65e56988b509..685930c808fc 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ extensions-contrib/virtual-columns extensions-contrib/thrift-extensions extensions-contrib/ambari-metrics-emitter + extensions-contrib/scan-query diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index 411e5db119c6..41597aab2bb8 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -64,7 +64,7 @@ public class SelectQueryEngine { private static final SelectStrategyFactory STRATEGY_FACTORY = new SelectStrategyFactory(); - private static class SelectStrategyFactory implements ColumnSelectorStrategyFactory + public static class SelectStrategyFactory implements ColumnSelectorStrategyFactory { @Override public SelectColumnSelectorStrategy makeColumnSelectorStrategy( @@ -202,23 +202,12 @@ public Result apply(Cursor cursor) int lastOffset = offset.startOffset(); for (; !cursor.isDone() && offset.hasNext(); cursor.advance(), offset.next()) { - final Map theEvent = Maps.newLinkedHashMap(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get())); - - for (ColumnSelectorPlus selectorPlus : selectorPlusList) { - selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent); - } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - - if (selector == null) { - theEvent.put(metric, null); - } else { - theEvent.put(metric, selector.get()); - } - } + final Map theEvent = singleEvent( + EventHolder.timestampKey, + timestampColumnSelector, + selectorPlusList, + metSelectors + ); builder.addEntry( new EventHolder( @@ -236,4 +225,31 @@ public Result apply(Cursor cursor) } ); } + + public static Map singleEvent( + String timestampKey, + LongColumnSelector timestampColumnSelector, + List> selectorPlusList, + Map metSelectors + ) + { + final Map theEvent = Maps.newLinkedHashMap(); + theEvent.put(timestampKey, new DateTime(timestampColumnSelector.get())); + + for (ColumnSelectorPlus selectorPlus : selectorPlusList) { + selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent); + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + + if (selector == null) { + theEvent.put(metric, null); + } else { + theEvent.put(metric, selector.get()); + } + } + return theEvent; + } } diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index cee47b3def78..b9861ee5e61f 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -250,38 +250,43 @@ public Object accumulate(Object accumulated, Object in) @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { - // json serializer will always close the yielder - CountingOutputStream os = new CountingOutputStream(outputStream); - jsonWriter.writeValue(os, yielder); - - os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. - os.close(); - successfulQueryCount.incrementAndGet(); - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .setDimension("success", "true") - .build("query/time", queryTime) - ); - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .build("query/bytes", os.getCount()) - ); - - requestLogger.log( - new RequestLogLine( - new DateTime(start), - req.getRemoteAddr(), - theQuery, - new QueryStats( - ImmutableMap.of( - "query/time", queryTime, - "query/bytes", os.getCount(), - "success", true - ) - ) - ) - ); + try { + // json serializer will always close the yielder + CountingOutputStream os = new CountingOutputStream(outputStream); + jsonWriter.writeValue(os, yielder); + + os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. + os.close(); + successfulQueryCount.incrementAndGet(); + final long queryTime = System.currentTimeMillis() - start; + emitter.emit( + DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) + .setDimension("success", "true") + .build("query/time", queryTime) + ); + emitter.emit( + DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) + .build("query/bytes", os.getCount()) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(start), + req.getRemoteAddr(), + theQuery, + new QueryStats( + ImmutableMap.of( + "query/time", queryTime, + "query/bytes", os.getCount(), + "success", true + ) + ) + ) + ); + } + finally { + Thread.currentThread().setName(currThreadName); + } } }, context.getContentType()