From e99faa0dc4d5521a04d843a41651d2eec3ba1049 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 3 Aug 2016 23:20:15 +0800 Subject: [PATCH 01/12] streaming version of select query --- distribution/pom.xml | 2 + extensions-contrib/scan-query/pom.xml | 47 ++++ .../java/io/druid/query/scan/ScanQuery.java | 247 ++++++++++++++++++ .../query/scan/ScanQueryDruidModule.java | 53 ++++ .../io/druid/query/scan/ScanQueryEngine.java | 168 ++++++++++++ .../query/scan/ScanQueryQueryToolChest.java | 92 +++++++ .../query/scan/ScanQueryRunnerFactory.java | 95 +++++++ .../io/druid/query/scan/ScanResultValue.java | 120 +++++++++ .../io.druid.initialization.DruidModule | 1 + pom.xml | 1 + .../druid/query/select/SelectQueryEngine.java | 50 ++-- 11 files changed, 859 insertions(+), 17 deletions(-) create mode 100644 extensions-contrib/scan-query/pom.xml create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java create mode 100644 extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule diff --git a/distribution/pom.xml b/distribution/pom.xml index 8908e0332a4d..044728d62911 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -104,6 +104,8 @@ io.druid.extensions:mysql-metadata-storage -c io.druid.extensions:postgresql-metadata-storage + -c + io.druid.extensions.contrib:scan-query ${druid.distribution.pulldeps.opts} diff --git a/extensions-contrib/scan-query/pom.xml b/extensions-contrib/scan-query/pom.xml new file mode 100644 index 000000000000..3f53ac9f5e62 --- /dev/null +++ b/extensions-contrib/scan-query/pom.xml @@ -0,0 +1,47 @@ + + + + + + + io.druid + druid + 0.9.2-SNAPSHOT + ../../pom.xml + + 4.0.0 + + io.druid.extensions.contrib + scan-query + scan-query + streaming version of select query + + + + io.druid + druid-server + ${project.parent.version} + provided + + + + \ No newline at end of file diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java new file mode 100644 index 000000000000..b40cd260509c --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -0,0 +1,247 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.druid.granularity.QueryGranularity; +import io.druid.query.BaseQuery; +import io.druid.query.DataSource; +import io.druid.query.Query; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.filter.DimFilter; +import io.druid.query.spec.QuerySegmentSpec; + +import java.util.List; +import java.util.Map; + +@JsonTypeName("scan") +public class ScanQuery extends BaseQuery +{ + public static final String SCAN = "scan"; + + private final int batchSize; + private final int limit; + private final QueryGranularity granularity; + private final DimFilter dimFilter; + private final List dimensions; + private final List metrics; + + @JsonCreator + public ScanQuery( + @JsonProperty("dataSource") DataSource dataSource, + @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("batchSize") int batchSize, + @JsonProperty("limit") int limit, + @JsonProperty("filter") DimFilter dimFilter, + @JsonProperty("granularity") QueryGranularity granularity, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics, + @JsonProperty("context") Map context + ) + { + super(dataSource, querySegmentSpec, false, context); + this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; + this.limit = (limit == 0) ? Integer.MAX_VALUE : limit; + this.dimFilter = dimFilter; + this.granularity = granularity; + this.dimensions = dimensions; + this.metrics = metrics; + } + + @JsonProperty + public int getBatchSize() + { + return batchSize; + } + + @JsonProperty + public int getLimit() + { + return limit; + } + + @Override + public boolean hasFilters() + { + return dimFilter != null; + } + + @Override + public DimFilter getFilter() + { + return dimFilter; + } + + @Override + public String getType() + { + return SCAN; + } + + @JsonProperty("filter") + public DimFilter getDimensionsFilter() + { + return dimFilter; + } + + @JsonProperty + public QueryGranularity getGranularity() + { + return granularity; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) + { + return new ScanQuery( + getDataSource(), + querySegmentSpec, + batchSize, + limit, + dimFilter, + granularity, + dimensions, + metrics, + getContext() + ); + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return new ScanQuery( + dataSource, + getQuerySegmentSpec(), + batchSize, + limit, + dimFilter, + granularity, + dimensions, + metrics, + getContext() + ); + } + + @Override + public Query withOverriddenContext(Map contextOverrides) + { + return new ScanQuery( + getDataSource(), + getQuerySegmentSpec(), + batchSize, + limit, + dimFilter, + granularity, + dimensions, + metrics, + computeOverridenContext(contextOverrides) + ); + } + + public ScanQuery withDimFilter(DimFilter dimFilter) + { + return new ScanQuery( + getDataSource(), + getQuerySegmentSpec(), + batchSize, + limit, + dimFilter, + granularity, + dimensions, + metrics, + getContext() + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + ScanQuery that = (ScanQuery) o; + + if (batchSize != that.batchSize) { + return false; + } + if (limit != that.limit) { + return false; + } + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) { + return false; + } + if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) { + return false; + } + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + return false; + } + return metrics != null ? metrics.equals(that.metrics) : that.metrics == null; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + batchSize; + result = 31 * result + limit; + result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); + result = 31 * result + (granularity != null ? granularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "ScanQuery{" + + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + + ", batchSize=" + batchSize + + ", limit=" + limit + + ", dimFilter=" + dimFilter + + ", granularity=" + granularity + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java new file mode 100644 index 000000000000..f33ea997b268 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryDruidModule.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.guice.DruidBinders; +import io.druid.guice.LazySingleton; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +public class ScanQueryDruidModule implements DruidModule { + public void configure(Binder binder) { + DruidBinders.queryToolChestBinder(binder) + .addBinding(ScanQuery.class) + .to(ScanQueryQueryToolChest.class) + .in(LazySingleton.class); + + DruidBinders.queryRunnerFactoryBinder(binder) + .addBinding(ScanQuery.class) + .to(ScanQueryRunnerFactory.class) + .in(LazySingleton.class); + } + + public List getJacksonModules() { + return Arrays.asList( + new SimpleModule("ScanQueryDruidModule") + .registerSubtypes( + new NamedType(ScanQuery.class, ScanQuery.SCAN) + ) + ); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java new file mode 100644 index 000000000000..64474a377064 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -0,0 +1,168 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.common.guava.BaseSequence; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.filter.Filter; +import io.druid.query.select.SelectQueryEngine; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.Segment; +import io.druid.segment.StorageAdapter; +import io.druid.segment.column.Column; +import io.druid.segment.filter.Filters; +import io.druid.timeline.DataSegmentUtils; +import org.joda.time.Interval; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class ScanQueryEngine +{ + public Sequence process(final ScanQuery query, final Segment segment) + { + final StorageAdapter adapter = segment.asStorageAdapter(); + + if (adapter == null) { + throw new ISE( + "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." + ); + } + + // at the point where this code is called, only one datasource should exist. + String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); + + final Iterable dims; + if (query.getDimensions() == null || query.getDimensions().isEmpty()) { + dims = DefaultDimensionSpec.toSpec(adapter.getAvailableDimensions()); + } else { + dims = query.getDimensions(); + } + + final Iterable metrics; + if (query.getMetrics() == null || query.getMetrics().isEmpty()) { + metrics = adapter.getAvailableMetrics(); + } else { + metrics = query.getMetrics(); + } + final List intervals = query.getQuerySegmentSpec().getIntervals(); + Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals); + + // should be rewritten with given interval + final String segmentId = DataSegmentUtils.withInterval(dataSource, segment.getIdentifier(), intervals.get(0)); + + final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); + + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + filter, + intervals.get(0), + query.getGranularity(), + query.isDescending() + ), + new Function>() + { + @Override + public Sequence apply(final Cursor cursor) + { + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + + final Map dimSelectors = Maps.newHashMap(); + for (DimensionSpec dim : dims) { + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); + dimSelectors.put(dim.getOutputName(), dimSelector); + } + + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + metSelectors.put(metric, metricSelector); + } + final int batchSize = query.getBatchSize(); + final int limit = query.getLimit(); + return new Iterator() + { + private int offset = 0; + + @Override + public boolean hasNext() + { + return !cursor.isDone() && offset < limit; + } + + @Override + public ScanResultValue next() + { + int lastOffset = offset; + int i = 0; + List> events = Lists.newArrayListWithCapacity(batchSize); + for (; !cursor.isDone() + && i < batchSize + && offset < limit; cursor.advance(), i++, offset++) { + final Map theEvent = SelectQueryEngine.singleEvent( + ScanResultValue.timestampKey, + timestampColumnSelector, + dimSelectors, + metSelectors + ); + events.add(theEvent); + } + return new ScanResultValue(segmentId, lastOffset, events); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + } + } + ); + } + } + ) + ); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java new file mode 100644 index 000000000000..3b44c1aa2440 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -0,0 +1,92 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.metamx.common.guava.Sequence; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.query.DruidMetrics; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChest; +import io.druid.query.aggregation.MetricManipulationFn; + +import java.util.Map; + +public class ScanQueryQueryToolChest extends QueryToolChest +{ + private static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; + + @Override + public QueryRunner mergeResults(final QueryRunner runner) + { + return new QueryRunner() + { + @Override + public Sequence run(final Query input, Map responseContext) + { + // no actually merge + return runner.run(input, responseContext); + } + }; + } + + @Override + public ServiceMetricEvent.Builder makeMetricBuilder(ScanQuery query) + { + return DruidMetrics.makePartialQueryTimeMetric(query); + } + + @Override + public Function makePreComputeManipulatorFn( + ScanQuery query, MetricManipulationFn fn + ) + { + return Functions.identity(); + } + + @Override + public TypeReference getResultTypeReference() + { + return TYPE_REFERENCE; + } + + @Override + public QueryRunner preMergeQueryDecoration(final QueryRunner runner) + { + return new QueryRunner() + { + @Override + public Sequence run( + Query query, Map responseContext + ) + { + ScanQuery scanQuery = (ScanQuery) query; + if (scanQuery.getDimensionsFilter() != null) { + scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize()); + } + return runner.run(scanQuery, responseContext); + } + }; + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java new file mode 100644 index 000000000000..850d7242e115 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.query.ConcatQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryToolChest; +import io.druid.segment.Segment; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class ScanQueryRunnerFactory implements QueryRunnerFactory +{ + private final ScanQueryQueryToolChest toolChest; + private final ScanQueryEngine engine; + + @Inject + public ScanQueryRunnerFactory( + ScanQueryQueryToolChest toolChest, + ScanQueryEngine engine + ) + { + this.toolChest = toolChest; + this.engine = engine; + } + + @Override + public QueryRunner createRunner(Segment segment) + { + return new ScanQueryRunner(engine, segment); + } + + @Override + public QueryRunner mergeRunners( + ExecutorService queryExecutor, + Iterable> queryRunners + ) + { + // in single thread and in jetty thread instead of processing thread + return new ConcatQueryRunner<>(Sequences.simple(queryRunners)); + } + + @Override + public QueryToolChest getToolchest() + { + return toolChest; + } + + private class ScanQueryRunner implements QueryRunner + { + private final ScanQueryEngine engine; + private final Segment segment; + + public ScanQueryRunner(ScanQueryEngine engine, Segment segment) + { + this.engine = engine; + this.segment = segment; + } + + @Override + public Sequence run( + Query query, Map responseContext + ) + { + if (!(query instanceof ScanQuery)) { + throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class); + } + + return engine.process((ScanQuery) query, segment); + } + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java new file mode 100644 index 000000000000..0b5fd7ec33a7 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -0,0 +1,120 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + */ +public class ScanResultValue implements Comparable +{ + public static final String timestampKey = "timestamp"; + + private final String segmentId; + private final int offset; + private final List> events; + + @JsonCreator + public ScanResultValue( + @JsonProperty("segmentId") String segmentId, + @JsonProperty("offset") int offset, + @JsonProperty("events") List> events + ) + { + this.segmentId = segmentId; + this.offset = offset; + this.events = events; + } + + @JsonProperty + public String getSegmentId() + { + return segmentId; + } + + @JsonProperty + public int getOffset() + { + return offset; + } + + @JsonProperty + public List> getEvents() + { + return events; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ScanResultValue that = (ScanResultValue) o; + + if (offset != that.offset) { + return false; + } + if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) { + return false; + } + return events != null ? events.equals(that.events) : that.events == null; + + } + + @Override + public int hashCode() { + int result = segmentId != null ? segmentId.hashCode() : 0; + result = 31 * result + offset; + result = 31 * result + (events != null ? events.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "ScanResultValue{" + + "segmentId='" + segmentId + '\'' + + ", offset=" + offset + + ", events=" + events + + '}'; + } + + @Override + public int compareTo(ScanResultValue that) + { + if (that == null) { + return 1; + } + if (segmentId == null && that.segmentId == null) { + return Integer.compare(offset, that.offset); + } + if (segmentId != null && that.segmentId != null) { + return segmentId.compareTo(that.segmentId); + } + return segmentId != null ? 1 : 0; + } +} diff --git a/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..1459501bf4a8 --- /dev/null +++ b/extensions-contrib/scan-query/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.query.scan.ScanQueryDruidModule \ No newline at end of file diff --git a/pom.xml b/pom.xml index 65e56988b509..685930c808fc 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ extensions-contrib/virtual-columns extensions-contrib/thrift-extensions extensions-contrib/ambari-metrics-emitter + extensions-contrib/scan-query diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index 411e5db119c6..f9c7f62e2506 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -202,23 +202,12 @@ public Result apply(Cursor cursor) int lastOffset = offset.startOffset(); for (; !cursor.isDone() && offset.hasNext(); cursor.advance(), offset.next()) { - final Map theEvent = Maps.newLinkedHashMap(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get())); - - for (ColumnSelectorPlus selectorPlus : selectorPlusList) { - selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent); - } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - - if (selector == null) { - theEvent.put(metric, null); - } else { - theEvent.put(metric, selector.get()); - } - } + final Map theEvent = singleEvent( + EventHolder.timestampKey, + timestampColumnSelector, + dimSelectors, + metSelectors + ); builder.addEntry( new EventHolder( @@ -236,4 +225,31 @@ public Result apply(Cursor cursor) } ); } + + public static Map singleEvent( + String timestampKey, + LongColumnSelector timestampColumnSelector, + Map dimSelectors, + Map metSelectors + ) + { + final Map theEvent = Maps.newLinkedHashMap(); + theEvent.put(timestampKey, new DateTime(timestampColumnSelector.get())); + + for (ColumnSelectorPlus selectorPlus : selectorPlusList) { + selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent); + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + + if (selector == null) { + theEvent.put(metric, null); + } else { + theEvent.put(metric, selector.get()); + } + } + return theEvent; + } } From a0e2cfa805a97dde1835d0a79d062a40de083d6c Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 31 Aug 2016 15:21:36 +0800 Subject: [PATCH 02/12] use columns instead of dimensions and metrics;prepare for valueVector;remove granularity --- .../java/io/druid/query/scan/ScanQuery.java | 77 +++++++------------ .../io/druid/query/scan/ScanQueryEngine.java | 47 +++++++---- .../io/druid/query/scan/ScanResultValue.java | 7 +- 3 files changed, 62 insertions(+), 69 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index b40cd260509c..3f1dc0bf818b 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import io.druid.granularity.QueryGranularity; import io.druid.query.BaseQuery; import io.druid.query.DataSource; import io.druid.query.Query; @@ -37,33 +36,36 @@ public class ScanQuery extends BaseQuery { public static final String SCAN = "scan"; + private final String resultFormat; private final int batchSize; private final int limit; - private final QueryGranularity granularity; private final DimFilter dimFilter; - private final List dimensions; - private final List metrics; + private final List columns; @JsonCreator public ScanQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("resultFormat") String resultFormat, @JsonProperty("batchSize") int batchSize, @JsonProperty("limit") int limit, @JsonProperty("filter") DimFilter dimFilter, - @JsonProperty("granularity") QueryGranularity granularity, - @JsonProperty("dimensions") List dimensions, - @JsonProperty("metrics") List metrics, + @JsonProperty("columns") List columns, @JsonProperty("context") Map context ) { super(dataSource, querySegmentSpec, false, context); + this.resultFormat = resultFormat; this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; this.limit = (limit == 0) ? Integer.MAX_VALUE : limit; this.dimFilter = dimFilter; - this.granularity = granularity; - this.dimensions = dimensions; - this.metrics = metrics; + this.columns = columns; + } + + @JsonProperty + public String getResultFormat() + { + return resultFormat; } @JsonProperty @@ -103,21 +105,9 @@ public DimFilter getDimensionsFilter() } @JsonProperty - public QueryGranularity getGranularity() - { - return granularity; - } - - @JsonProperty - public List getDimensions() + public List getColumns() { - return dimensions; - } - - @JsonProperty - public List getMetrics() - { - return metrics; + return columns; } @Override @@ -126,12 +116,11 @@ public Query withQuerySegmentSpec(QuerySegmentSpec querySegment return new ScanQuery( getDataSource(), querySegmentSpec, + resultFormat, batchSize, limit, dimFilter, - granularity, - dimensions, - metrics, + columns, getContext() ); } @@ -142,12 +131,11 @@ public Query withDataSource(DataSource dataSource) return new ScanQuery( dataSource, getQuerySegmentSpec(), + resultFormat, batchSize, limit, dimFilter, - granularity, - dimensions, - metrics, + columns, getContext() ); } @@ -158,12 +146,11 @@ public Query withOverriddenContext(Map contextO return new ScanQuery( getDataSource(), getQuerySegmentSpec(), + resultFormat, batchSize, limit, dimFilter, - granularity, - dimensions, - metrics, + columns, computeOverridenContext(contextOverrides) ); } @@ -173,12 +160,11 @@ public ScanQuery withDimFilter(DimFilter dimFilter) return new ScanQuery( getDataSource(), getQuerySegmentSpec(), + resultFormat, batchSize, limit, dimFilter, - granularity, - dimensions, - metrics, + columns, getContext() ); } @@ -204,28 +190,24 @@ public boolean equals(Object o) if (limit != that.limit) { return false; } - if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) { - return false; - } - if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) { + if (resultFormat != null ? !resultFormat.equals(that.resultFormat) : that.resultFormat != null) { return false; } - if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) { return false; } - return metrics != null ? metrics.equals(that.metrics) : that.metrics == null; + return columns != null ? columns.equals(that.columns) : that.columns == null; } @Override public int hashCode() { int result = super.hashCode(); + result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0); result = 31 * result + batchSize; result = 31 * result + limit; result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); - result = 31 * result + (granularity != null ? granularity.hashCode() : 0); - result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); - result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + result = 31 * result + (columns != null ? columns.hashCode() : 0); return result; } @@ -236,12 +218,11 @@ public String toString() "dataSource='" + getDataSource() + '\'' + ", querySegmentSpec=" + getQuerySegmentSpec() + ", descending=" + isDescending() + + ", resultFormat='" + resultFormat + '\'' + ", batchSize=" + batchSize + ", limit=" + limit + ", dimFilter=" + dimFilter + - ", granularity=" + granularity + - ", dimensions=" + dimensions + - ", metrics=" + metrics + + ", columns=" + columns + '}'; } } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 64474a377064..9235461f69d0 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -27,6 +27,7 @@ import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import io.druid.granularity.QueryGranularities; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.Filter; @@ -61,19 +62,15 @@ public Sequence process(final ScanQuery query, final Segment se // at the point where this code is called, only one datasource should exist. String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); - final Iterable dims; - if (query.getDimensions() == null || query.getDimensions().isEmpty()) { - dims = DefaultDimensionSpec.toSpec(adapter.getAvailableDimensions()); - } else { - dims = query.getDimensions(); + List allDims = Lists.newLinkedList(adapter.getAvailableDimensions()); + List allMetrics = Lists.newLinkedList(adapter.getAvailableMetrics()); + if (query.getColumns() != null && !query.getColumns().isEmpty()) { + allDims.retainAll(query.getColumns()); + allMetrics.retainAll(query.getColumns()); } + final List dims = DefaultDimensionSpec.toSpec(allDims); + final List metrics = allMetrics; - final Iterable metrics; - if (query.getMetrics() == null || query.getMetrics().isEmpty()) { - metrics = adapter.getAvailableMetrics(); - } else { - metrics = query.getMetrics(); - } final List intervals = query.getQuerySegmentSpec().getIntervals(); Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals); @@ -87,7 +84,7 @@ public Sequence process(final ScanQuery query, final Segment se adapter.makeCursors( filter, intervals.get(0), - query.getGranularity(), + QueryGranularities.ALL, query.isDescending() ), new Function>() @@ -130,6 +127,23 @@ public boolean hasNext() public ScanResultValue next() { int lastOffset = offset; + Object events = null; + String resultFormat = query.getResultFormat(); + if ("valueVector".equals(resultFormat)) { + events = rowsToValueVector(); + } else { + events = rowsToList(); + } + return new ScanResultValue(segmentId, lastOffset, events); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + private Object rowsToList() { int i = 0; List> events = Lists.newArrayListWithCapacity(batchSize); for (; !cursor.isDone() @@ -143,13 +157,12 @@ public ScanResultValue next() ); events.add(theEvent); } - return new ScanResultValue(segmentId, lastOffset, events); + return events; } - @Override - public void remove() - { - throw new UnsupportedOperationException(); + private Object rowsToValueVector() { + // only support list now, we can support ValueVector or Arrow in future + return rowsToList(); } }; } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java index 0b5fd7ec33a7..f8edcd4b28d1 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -32,13 +32,13 @@ public class ScanResultValue implements Comparable private final String segmentId; private final int offset; - private final List> events; + private final Object events; @JsonCreator public ScanResultValue( @JsonProperty("segmentId") String segmentId, @JsonProperty("offset") int offset, - @JsonProperty("events") List> events + @JsonProperty("events") Object events ) { this.segmentId = segmentId; @@ -59,7 +59,7 @@ public int getOffset() } @JsonProperty - public List> getEvents() + public Object getEvents() { return events; } @@ -82,7 +82,6 @@ public boolean equals(Object o) { return false; } return events != null ? events.equals(that.events) : that.events == null; - } @Override From 40f42eb1010f1f69655d4478fb9f98a7c511ba2c Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 7 Sep 2016 15:40:07 +0800 Subject: [PATCH 03/12] respect query limit within historical --- .../io/druid/query/scan/ScanQueryEngine.java | 24 ++++++++++++--- .../query/scan/ScanQueryRunnerFactory.java | 29 ++++++++++++++++--- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 9235461f69d0..28f42852df07 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -49,8 +49,18 @@ public class ScanQueryEngine { - public Sequence process(final ScanQuery query, final Segment segment) + public Sequence process( + final ScanQuery query, + final Segment segment, + final Map responseContext + ) { + if (responseContext.get("count") != null) { + int count = (int) responseContext.get("count"); + if (count >= query.getLimit()) { + return Sequences.empty(); + } + } final StorageAdapter adapter = segment.asStorageAdapter(); if (adapter == null) { @@ -79,6 +89,10 @@ public Sequence process(final ScanQuery query, final Segment se final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); + if (responseContext.get("count") == null) { + responseContext.put("count", 0); + } + final int limit = query.getLimit() - (int) responseContext.get("count"); return Sequences.concat( Sequences.map( adapter.makeCursors( @@ -112,7 +126,6 @@ public Iterator make() metSelectors.put(metric, metricSelector); } final int batchSize = query.getBatchSize(); - final int limit = query.getLimit(); return new Iterator() { private int offset = 0; @@ -134,6 +147,7 @@ public ScanResultValue next() } else { events = rowsToList(); } + responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset)); return new ScanResultValue(segmentId, lastOffset, events); } @@ -143,7 +157,8 @@ public void remove() throw new UnsupportedOperationException(); } - private Object rowsToList() { + private Object rowsToList() + { int i = 0; List> events = Lists.newArrayListWithCapacity(batchSize); for (; !cursor.isDone() @@ -160,7 +175,8 @@ private Object rowsToList() { return events; } - private Object rowsToValueVector() { + private Object rowsToValueVector() + { // only support list now, we can support ValueVector or Arrow in future return rowsToList(); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 850d7242e115..7c51848516c3 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -18,11 +18,11 @@ */ package io.druid.query.scan; +import com.google.common.base.Function; import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -56,11 +56,32 @@ public QueryRunner createRunner(Segment segment) @Override public QueryRunner mergeRunners( ExecutorService queryExecutor, - Iterable> queryRunners + final Iterable> queryRunners ) { // in single thread and in jetty thread instead of processing thread - return new ConcatQueryRunner<>(Sequences.simple(queryRunners)); + return new QueryRunner() + { + @Override + public Sequence run( + final Query query, final Map responseContext + ) + { + return Sequences.concat( + Sequences.map( + Sequences.simple(queryRunners), + new Function, Sequence>() + { + @Override + public Sequence apply(final QueryRunner input) + { + return input.run(query, responseContext); + } + } + ) + ); + } + }; } @Override @@ -89,7 +110,7 @@ public Sequence run( throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class); } - return engine.process((ScanQuery) query, segment); + return engine.process((ScanQuery) query, segment, responseContext); } } } From a86ef3bdb986a6b4f38cb31dc0170283fff4a769 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Sat, 10 Sep 2016 19:25:29 +0800 Subject: [PATCH 04/12] use constant --- .../src/main/java/io/druid/query/scan/ScanQuery.java | 2 ++ .../src/main/java/io/druid/query/scan/ScanQueryEngine.java | 4 ++-- .../src/main/java/io/druid/query/scan/ScanResultValue.java | 2 -- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index 3f1dc0bf818b..c40ad245d3b0 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -35,6 +35,8 @@ public class ScanQuery extends BaseQuery { public static final String SCAN = "scan"; + public static final String RESULT_FORMAT_LIST = "list"; + public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector"; private final String resultFormat; private final int batchSize; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 28f42852df07..bae7edaca16a 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -142,8 +142,8 @@ public ScanResultValue next() int lastOffset = offset; Object events = null; String resultFormat = query.getResultFormat(); - if ("valueVector".equals(resultFormat)) { - events = rowsToValueVector(); + if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) { + throw new UnsupportedOperationException("valueVector is not supported now"); } else { events = rowsToList(); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java index f8edcd4b28d1..1f24f6250025 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; -/** - */ public class ScanResultValue implements Comparable { public static final String timestampKey = "timestamp"; From 3e96e61cba59d646b71e22507c7748536195e247 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 2 Nov 2016 00:22:59 +0800 Subject: [PATCH 05/12] fix thread name corrupted bug when using jetty qtp thread rather than processing thread while working with SpecificSegmentQueryRunner --- extensions-contrib/scan-query/pom.xml | 4 +- .../io/druid/query/scan/ScanQueryEngine.java | 6 +-- .../query/scan/ScanQueryQueryToolChest.java | 2 +- .../query/scan/ScanQueryRunnerFactory.java | 4 +- .../io/druid/query/scan/ScanResultValue.java | 3 -- .../java/io/druid/server/QueryResource.java | 39 +++++++++++-------- 6 files changed, 30 insertions(+), 28 deletions(-) diff --git a/extensions-contrib/scan-query/pom.xml b/extensions-contrib/scan-query/pom.xml index 3f53ac9f5e62..c19b24619ce4 100644 --- a/extensions-contrib/scan-query/pom.xml +++ b/extensions-contrib/scan-query/pom.xml @@ -25,7 +25,7 @@ io.druid druid - 0.9.2-SNAPSHOT + 0.9.3-SNAPSHOT ../../pom.xml 4.0.0 @@ -44,4 +44,4 @@ - \ No newline at end of file + diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index bae7edaca16a..b6081a4ab68d 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -24,10 +24,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.metamx.common.ISE; -import com.metamx.common.guava.BaseSequence; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.BaseSequence; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.Filter; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 3b44c1aa2440..052a83b48cea 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.metamx.common.guava.Sequence; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.guava.Sequence; import io.druid.query.DruidMetrics; import io.druid.query.Query; import io.druid.query.QueryRunner; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 7c51848516c3..4523a038a7c6 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -21,8 +21,8 @@ import com.google.common.base.Function; import com.google.inject.Inject; import com.metamx.common.ISE; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java index 1f24f6250025..3e6e09787df4 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -21,9 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.List; -import java.util.Map; - public class ScanResultValue implements Comparable { public static final String timestampKey = "timestamp"; diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index cee47b3def78..c946f5391375 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -250,9 +250,10 @@ public Object accumulate(Object accumulated, Object in) @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { - // json serializer will always close the yielder - CountingOutputStream os = new CountingOutputStream(outputStream); - jsonWriter.writeValue(os, yielder); + try { + // json serializer will always close the yielder + CountingOutputStream os = new CountingOutputStream(outputStream); + jsonWriter.writeValue(os, yielder); os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. os.close(); @@ -268,20 +269,24 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE .build("query/bytes", os.getCount()) ); - requestLogger.log( - new RequestLogLine( - new DateTime(start), - req.getRemoteAddr(), - theQuery, - new QueryStats( - ImmutableMap.of( - "query/time", queryTime, - "query/bytes", os.getCount(), - "success", true - ) - ) - ) - ); + requestLogger.log( + new RequestLogLine( + new DateTime(start), + req.getRemoteAddr(), + theQuery, + new QueryStats( + ImmutableMap.of( + "query/time", queryTime, + "query/bytes", os.getCount(), + "success", true + ) + ) + ) + ); + } + finally { + Thread.currentThread().setName(currThreadName); + } } }, context.getContentType() From 66e30e8776230a70cb5c568c184e4e50b365aaf9 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 22 Nov 2016 02:22:59 +0800 Subject: [PATCH 06/12] add some test for scan query --- extensions-contrib/scan-query/pom.xml | 13 +- .../java/io/druid/query/scan/ScanQuery.java | 163 +++++- .../io/druid/query/scan/ScanQueryEngine.java | 9 +- .../query/scan/ScanQueryRunnerFactory.java | 2 +- .../io/druid/query/scan/ScanResultValue.java | 29 +- .../druid/query/scan/ScanQueryRunnerTest.java | 512 ++++++++++++++++++ .../druid/query/scan/ScanQuerySpecTest.java | 75 +++ 7 files changed, 783 insertions(+), 20 deletions(-) create mode 100644 extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java create mode 100644 extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java diff --git a/extensions-contrib/scan-query/pom.xml b/extensions-contrib/scan-query/pom.xml index c19b24619ce4..183ddbf68da3 100644 --- a/extensions-contrib/scan-query/pom.xml +++ b/extensions-contrib/scan-query/pom.xml @@ -40,7 +40,18 @@ io.druid druid-server ${project.parent.version} - provided + + + junit + junit + test + + + io.druid + druid-processing + ${project.parent.version} + tests + test diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index c40ad245d3b0..c74e41ff2c39 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -21,13 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import io.druid.query.BaseQuery; import io.druid.query.DataSource; import io.druid.query.Query; -import io.druid.query.dimension.DimensionSpec; +import io.druid.query.TableDataSource; import io.druid.query.filter.DimFilter; +import io.druid.query.filter.InDimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.spec.LegacySegmentSpec; import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.Interval; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -57,9 +64,11 @@ public ScanQuery( ) { super(dataSource, querySegmentSpec, false, context); - this.resultFormat = resultFormat; + this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat; this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; this.limit = (limit == 0) ? Integer.MAX_VALUE : limit; + Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0"); + Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0"); this.dimFilter = dimFilter; this.columns = columns; } @@ -227,4 +236,154 @@ public String toString() ", columns=" + columns + '}'; } + + /** + * A Builder for ScanQuery. + *

+ * Required: dataSource(), intervals() must be called before build() + *

+ * Usage example: + *


+   *   ScanQuery query = new ScanQueryBuilder()
+   *                                  .dataSource("Example")
+   *                                  .interval("2010/2013")
+   *                                  .build();
+   * 
+ * + * @see io.druid.query.scan.ScanQuery + */ + public static class ScanQueryBuilder + { + private DataSource dataSource; + private QuerySegmentSpec querySegmentSpec; + private Map context; + private String resultFormat; + private int batchSize; + private int limit; + private DimFilter dimFilter; + private List columns; + + public ScanQueryBuilder() + { + dataSource = null; + querySegmentSpec = null; + context = null; + resultFormat = null; + batchSize = 0; + limit = 0; + dimFilter = null; + columns = Lists.newArrayList(); + } + + public ScanQuery build() + { + return new ScanQuery( + dataSource, + querySegmentSpec, + resultFormat, + batchSize, + limit, + dimFilter, + columns, + context + ); + } + + public ScanQueryBuilder copy(ScanQueryBuilder builder) + { + return new ScanQueryBuilder() + .dataSource(builder.dataSource) + .intervals(builder.querySegmentSpec) + .context(builder.context); + } + + public ScanQueryBuilder dataSource(String ds) + { + dataSource = new TableDataSource(ds); + return this; + } + + public ScanQueryBuilder dataSource(DataSource ds) + { + dataSource = ds; + return this; + } + + public ScanQueryBuilder intervals(QuerySegmentSpec q) + { + querySegmentSpec = q; + return this; + } + + public ScanQueryBuilder intervals(String s) + { + querySegmentSpec = new LegacySegmentSpec(s); + return this; + } + + public ScanQueryBuilder intervals(List l) + { + querySegmentSpec = new LegacySegmentSpec(l); + return this; + } + + public ScanQueryBuilder context(Map c) + { + context = c; + return this; + } + + public ScanQueryBuilder resultFormat(String r) + { + resultFormat = r; + return this; + } + + public ScanQueryBuilder batchSize(int b) + { + batchSize = b; + return this; + } + + public ScanQueryBuilder limit(int l) + { + limit = l; + return this; + } + + public ScanQueryBuilder filters(String dimensionName, String value) + { + dimFilter = new SelectorDimFilter(dimensionName, value, null); + return this; + } + + public ScanQueryBuilder filters(String dimensionName, String value, String... values) + { + dimFilter = new InDimFilter(dimensionName, Lists.asList(value, values), null); + return this; + } + + public ScanQueryBuilder filters(DimFilter f) + { + dimFilter = f; + return this; + } + + public ScanQueryBuilder columns(List c) + { + columns = c; + return this; + } + + public ScanQueryBuilder columns(String... c) + { + columns = Arrays.asList(c); + return this; + } + } + + public static ScanQueryBuilder newScanQueryBuilder() + { + return new ScanQueryBuilder(); + } } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index b6081a4ab68d..ef2e7a2b765c 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -23,8 +23,9 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.metamx.common.ISE; +import com.google.common.collect.Sets; import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -46,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; public class ScanQueryEngine { @@ -112,18 +114,21 @@ public Sequence apply(final Cursor cursor) @Override public Iterator make() { + final Set columns = Sets.newHashSet(); final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); final Map dimSelectors = Maps.newHashMap(); for (DimensionSpec dim : dims) { final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); dimSelectors.put(dim.getOutputName(), dimSelector); + columns.add(dim.getOutputName()); } final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); + columns.add(metric); } final int batchSize = query.getBatchSize(); return new Iterator() @@ -148,7 +153,7 @@ public ScanResultValue next() events = rowsToList(); } responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset)); - return new ScanResultValue(segmentId, lastOffset, events); + return new ScanResultValue(segmentId, columns, events); } @Override diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 4523a038a7c6..012d448768c8 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -20,7 +20,7 @@ import com.google.common.base.Function; import com.google.inject.Inject; -import com.metamx.common.ISE; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java index 3e6e09787df4..4a834663bb3b 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -21,23 +21,25 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Set; + public class ScanResultValue implements Comparable { public static final String timestampKey = "timestamp"; private final String segmentId; - private final int offset; + private final Set columns; private final Object events; @JsonCreator public ScanResultValue( @JsonProperty("segmentId") String segmentId, - @JsonProperty("offset") int offset, + @JsonProperty("columns") Set columns, @JsonProperty("events") Object events ) { this.segmentId = segmentId; - this.offset = offset; + this.columns = columns; this.events = events; } @@ -48,9 +50,9 @@ public String getSegmentId() } @JsonProperty - public int getOffset() + public Set getColumns() { - return offset; + return columns; } @JsonProperty @@ -60,7 +62,8 @@ public Object getEvents() } @Override - public boolean equals(Object o) { + public boolean equals(Object o) + { if (this == o) { return true; } @@ -70,19 +73,20 @@ public boolean equals(Object o) { ScanResultValue that = (ScanResultValue) o; - if (offset != that.offset) { + if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) { return false; } - if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) { + if (columns != null ? !columns.equals(that.columns) : that.columns != null) { return false; } return events != null ? events.equals(that.events) : that.events == null; } @Override - public int hashCode() { + public int hashCode() + { int result = segmentId != null ? segmentId.hashCode() : 0; - result = 31 * result + offset; + result = 31 * result + (columns != null ? columns.hashCode() : 0); result = 31 * result + (events != null ? events.hashCode() : 0); return result; } @@ -92,7 +96,7 @@ public String toString() { return "ScanResultValue{" + "segmentId='" + segmentId + '\'' + - ", offset=" + offset + + ", columns=" + columns + ", events=" + events + '}'; } @@ -103,9 +107,6 @@ public int compareTo(ScanResultValue that) if (that == null) { return 1; } - if (segmentId == null && that.segmentId == null) { - return Integer.compare(offset, that.offset); - } if (segmentId != null && that.segmentId != null) { return segmentId.compareTo(that.segmentId); } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java new file mode 100644 index 000000000000..74a2ceb1d9d2 --- /dev/null +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java @@ -0,0 +1,512 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ObjectArrays; +import com.google.common.collect.Sets; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.TableDataSource; +import io.druid.query.extraction.MapLookupExtractor; +import io.druid.query.filter.AndDimFilter; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.lookup.LookupExtractionFn; +import io.druid.query.spec.LegacySegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +@RunWith(Parameterized.class) +public class ScanQueryRunnerTest +{ + // copied from druid.sample.tsv + public static final String[] V_0112 = { + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000", + "2011-01-12T00:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value", + "2011-01-12T00:00:00.000Z upfront premium preferred ppreferred 800.000000 value" + }; + public static final String[] V_0113 = { + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928", + "2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875", + "2011-01-13T00:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value", + "2011-01-13T00:00:00.000Z upfront premium preferred ppreferred 1564.617729 value" + }; + + public static final QuerySegmentSpec I_0112_0114 = new LegacySegmentSpec( + new Interval("2011-01-12/2011-01-14") + ); + public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class); + + private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(); + + @Parameterized.Parameters(name = "{0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian( + QueryRunnerTestHelper.makeQueryRunners( + new ScanQueryRunnerFactory( + toolChest, + new ScanQueryEngine() + ) + ) + ); + } + + private final QueryRunner runner; + + public ScanQueryRunnerTest(QueryRunner runner) + { + this.runner = runner; + } + + private ScanQuery.ScanQueryBuilder newTestQuery() + { + return ScanQuery.newScanQueryBuilder() + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .columns(Arrays.asList()) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .limit(3); + } + + @Test + public void testFullOnSelect() + { + List columns = Lists.newArrayList( + "market", + "quality", + "placement", + "placementish", + "partial_null_column", + "null_column", + "index", + "quality_uniques", + "indexMin", + "indexMaxPlusTen" + ); + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toFullEvents(V_0112_0114), + columns, + 0, + 3 + ); + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); + } + + @Test + public void testSelectWithDimsAndMets() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + null, + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + V_0112_0114 + ), + Lists.newArrayList("market", "index"), + 0, + 3 + ); + verify(expectedResults, results); + } + + @Test + public void testFullOnSelectWithFilterAndLimit() + { + // limits + for (int limit : new int[]{3, 1, 5, 7, 0}) { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) + .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .limit(limit) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + null, + QueryRunnerTestHelper.qualityDimension + ":STRING", + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + // filtered values with day granularity + new String[]{ + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000" + }, + new String[]{ + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928" + } + ); + + List expectedResults = toExpected( + events, + Lists.newArrayList("quality", "index"), + 0, + limit + ); + verify(expectedResults, results); + } + } + + @Test + public void testSelectWithFilterLookupExtractionFn() + { + + Map extractionMap = new HashMap<>(); + extractionMap.put("total_market", "replaced"); + MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false); + LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true); + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "replaced", lookupExtractionFn)) + .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .build(); + + Iterable results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.newArrayList() + ); + Iterable resultsOptimize = Sequences.toList( + toolChest.postMergeQueryDecoration(toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))). + run(query, Maps.newHashMap()), Lists.newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + null, + QueryRunnerTestHelper.qualityDimension + ":STRING", + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + // filtered values with day granularity + new String[]{ + "2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000" + }, + new String[]{ + "2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875" + } + ); + + List expectedResults = toExpected( + events, + Lists.newArrayList(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric), + 0, + 3 + ); + + verify(expectedResults, results); + verify(expectedResults, resultsOptimize); + } + + @Test + public void testFullSelectNoResults() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .filters( + new AndDimFilter( + Arrays.asList( + new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null), + new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "foo", null) + ) + ) + ) + .build(); + + Iterable results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.newArrayList() + ); + + List expectedResults = Arrays.asList( + ); + + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); + } + + @Test + public void testFullSelectNoDimensionAndMetric() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns("foo", "foo2") + .build(); + + Iterable results = Sequences.toList( + runner.run(query, Maps.newHashMap()), + Lists.newArrayList() + ); + + final List>> events = toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME" + }, + V_0112_0114 + ); + + List expectedResults = toExpected( + events, + Lists.newArrayList(), + 0, + 3 + ); + verify(expectedResults, results); + } + + private List>> toFullEvents(final String[]... valueSet) + { + return toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + QueryRunnerTestHelper.qualityDimension + ":STRING", + QueryRunnerTestHelper.placementDimension + ":STRING", + QueryRunnerTestHelper.placementishDimension + ":STRINGS", + QueryRunnerTestHelper.indexMetric + ":FLOAT", + QueryRunnerTestHelper.partialNullDimension + ":STRING" + }, + valueSet + ); + } + + private List>> toEvents(final String[] dimSpecs, final String[]... valueSet) + { + List values = Lists.newArrayList(); + for (String[] vSet : valueSet) { + values.addAll(Arrays.asList(vSet)); + } + List>> events = Lists.newArrayList(); + events.add( + Lists.newArrayList( + Iterables.transform( + values, new Function>() + { + @Override + public Map apply(String input) + { + Map event = Maps.newHashMap(); + String[] values = input.split("\\t"); + for (int i = 0; i < dimSpecs.length; i++) { + if (dimSpecs[i] == null || i >= dimSpecs.length || i >= values.length) { + continue; + } + String[] specs = dimSpecs[i].split(":"); + event.put( + specs[0], + specs.length == 1 || specs[1].equals("STRING") ? values[i] : + specs[1].equals("TIME") ? new DateTime(values[i]) : + specs[1].equals("FLOAT") ? Float.valueOf(values[i]) : + specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) : + specs[1].equals("LONG") ? Long.valueOf(values[i]) : + specs[1].equals("NULL") ? null : + specs[1].equals("STRINGS") ? Arrays.asList(values[i].split("\u0001")) : + values[i] + ); + } + return event; + } + } + ) + ) + ); + return events; + } + + private List toExpected( + List>> targets, + List columns, + final int offset, + final int limit + ) + { + List expected = Lists.newArrayListWithExpectedSize(targets.size()); + for (List> group : targets) { + List> events = Lists.newArrayListWithExpectedSize(limit); + int end = Math.min(group.size(), offset + limit); + if (end == 0) { + end = group.size(); + } + events.addAll(group.subList(offset, end)); + expected.add( + new ScanResultValue( + QueryRunnerTestHelper.segmentId, + Sets.newHashSet(columns), + events + ) + ); + } + return expected; + } + + private static void verify( + Iterable expectedResults, + Iterable actualResults + ) + { + Iterator expectedIter = expectedResults.iterator(); + Iterator actualIter = actualResults.iterator(); + + while (expectedIter.hasNext()) { + ScanResultValue expected = expectedIter.next(); + ScanResultValue actual = actualIter.next(); + + Assert.assertEquals(expected.getSegmentId(), actual.getSegmentId()); + + Assert.assertEquals(expected.getColumns(), actual.getColumns()); + + Iterator> expectedEvts = ((List>) expected.getEvents()).iterator(); + Iterator> actualEvts = ((List>) actual.getEvents()).iterator(); + + while (expectedEvts.hasNext()) { + Map exHolder = expectedEvts.next(); + Map acHolder = actualEvts.next(); + + for (Map.Entry ex : exHolder.entrySet()) { + Object actVal = acHolder.get(ex.getKey()); + + // work around for current II limitations + if (acHolder.get(ex.getKey()) instanceof Double) { + actVal = ((Double) actVal).floatValue(); + } + Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal); + } + } + + if (actualEvts.hasNext()) { + throw new ISE("This event iterator should be exhausted!"); + } + } + + if (actualIter.hasNext()) { + throw new ISE("This iterator should be exhausted!"); + } + } + + private static Iterable populateNullColumnAtLastForQueryableIndexCase( + Iterable results, + String columnName + ) + { + // A Queryable index does not have the null column when it has loaded a index. + for (ScanResultValue value : results) { + Set columns = value.getColumns(); + if (columns.contains(columnName)) { + break; + } + columns.add(columnName); + } + + return results; + } + +} diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java new file mode 100644 index 000000000000..64d92b71547f --- /dev/null +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.TableDataSource; +import io.druid.query.spec.LegacySegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class ScanQuerySpecTest +{ + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testSerializationLegacyString() throws Exception + { + String legacy = + "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]}," + + "\"filter\":null," + + "\"columns\":[\"market\",\"quality\",\"index\"]," + + "\"limit\":3," + + "\"context\":null}"; + + String current = + "{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"}," + + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]}," + + "\"resultFormat\":\"list\"," + + "\"batchSize\":20480," + + "\"limit\":3," + + "\"filter\":null," + + "\"columns\":[\"market\",\"quality\",\"index\"]," + + "\"context\":null," + + "\"descending\":false}"; + + ScanQuery query = new ScanQuery( + new TableDataSource(QueryRunnerTestHelper.dataSource), + new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), + null, + 0, + 3, + null, + Arrays.asList("market", "quality", "index"), + null + ); + + String actual = jsonMapper.writeValueAsString(query); + Assert.assertEquals(current, actual); + Assert.assertEquals(query, jsonMapper.readValue(actual, ScanQuery.class)); + Assert.assertEquals(query, jsonMapper.readValue(legacy, ScanQuery.class)); + } +} From 5bef632daee2e2a6544ac28ff7bbd54594cba037 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 6 Dec 2016 02:06:11 +0800 Subject: [PATCH 07/12] add scan query document --- .../extensions-contrib/scan-query.md | 143 ++++++++++++++++++ docs/content/development/extensions.md | 1 + 2 files changed, 144 insertions(+) create mode 100644 docs/content/development/extensions-contrib/scan-query.md diff --git a/docs/content/development/extensions-contrib/scan-query.md b/docs/content/development/extensions-contrib/scan-query.md new file mode 100644 index 000000000000..457314c8d480 --- /dev/null +++ b/docs/content/development/extensions-contrib/scan-query.md @@ -0,0 +1,143 @@ +--- +layout: doc_page +--- + +# Scan query +Scan query returns raw Druid rows in streaming mode. + +```json + { + "queryType": "scan", + "dataSource": "wikipedia", + "resultFormat": "list", + "columns":[], + "intervals": [ + "2013-01-01/2013-01-02" + ], + "batchSize":20480, + "limit":5 + } +``` + +There are several main parts to a scan query: + +|property|description|required?| +|--------|-----------|---------| +|queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes| +|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes| +|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| +|resultFormat|How result represented, list or valueVector. Currently only `list` is supported. Default is `list`|no| +|filter|See [Filters](../querying/filters.html)|no| +|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| +|batchSize|How many rows buffered before return to client. Default is `20480`|no| +|limit|How many rows to return. If not specified, all rows will be returned.|no| +|context|An additional JSON Object which can be used to specify certain flags.|no| + +The format of the result is: + +```json + [{ + "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", + "columns" : [ + "robot", + "namespace", + "anonymous", + "unpatrolled", + "page", + "language", + "newpage", + "user", + "count", + "added", + "delta", + "variation", + "deleted" + ], + "events" : [ { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "1", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "11._korpus_(NOVJ)", + "language" : "sl", + "newpage" : "0", + "user" : "EmausBot", + "count" : 1.0, + "added" : 39.0, + "delta" : 39.0, + "variation" : 39.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "112_U.S._580", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 70.0, + "delta" : 70.0, + "variation" : 70.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "113_U.S._243", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 77.0, + "delta" : 77.0, + "variation" : 77.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "113_U.S._73", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 70.0, + "delta" : 70.0, + "variation" : 70.0, + "deleted" : 0.0 + }, { + "timestamp" : "2013-01-01T00:00:00.000Z", + "robot" : "0", + "namespace" : "article", + "anonymous" : "0", + "unpatrolled" : "0", + "page" : "113_U.S._756", + "language" : "en", + "newpage" : "1", + "user" : "MZMcBride", + "count" : 1.0, + "added" : 68.0, + "delta" : 68.0, + "variation" : 68.0, + "deleted" : 0.0 + } ] +} ] +``` +The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client. +It will cause memory pressure if too many rows required by select query. +Scan query doesn't have this issue. +Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly. + +## Known issues +1. Scan query doesn't respect `limit` from broker side, currently only historical and realtime node respect `limit` +2. Scan query doesn't respect `timeout` config in query context. +3. Scan query can't be cancelled. +4. Scan query is running on qtp thread, thus it doesn't respect query priority. \ No newline at end of file diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 5813ccd78dcb..6a2e996ffd44 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -65,6 +65,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)| +|scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)| ## Promoting Community Extension to Core Extension From 4489a50167ae071c7442aba3b72ca55fe2972a5a Mon Sep 17 00:00:00 2001 From: kaijianding Date: Wed, 28 Dec 2016 01:20:04 +0800 Subject: [PATCH 08/12] fix merge conflicts --- .../io/druid/query/scan/ScanQueryEngine.java | 19 ++++++++++---- .../druid/query/select/SelectQueryEngine.java | 6 ++--- .../java/io/druid/server/QueryResource.java | 26 +++++++++---------- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index ef2e7a2b765c..282195e4af88 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -29,21 +29,24 @@ import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.Filter; import io.druid.query.select.SelectQueryEngine; import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; +import io.druid.segment.VirtualColumns; import io.druid.segment.column.Column; import io.druid.segment.filter.Filters; import io.druid.timeline.DataSegmentUtils; import org.joda.time.Interval; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -51,6 +54,7 @@ public class ScanQueryEngine { + private static final SelectQueryEngine.SelectStrategyFactory STRATEGY_FACTORY = new SelectQueryEngine.SelectStrategyFactory(); public Sequence process( final ScanQuery query, final Segment segment, @@ -100,6 +104,7 @@ public Sequence process( adapter.makeCursors( filter, intervals.get(0), + VirtualColumns.EMPTY, QueryGranularities.ALL, query.isDescending() ), @@ -117,10 +122,14 @@ public Iterator make() final Set columns = Sets.newHashSet(); final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); - final Map dimSelectors = Maps.newHashMap(); + final List> selectorPlusList = Arrays.asList( + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + Lists.newArrayList(dims), + cursor + ) + ); for (DimensionSpec dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); - dimSelectors.put(dim.getOutputName(), dimSelector); columns.add(dim.getOutputName()); } @@ -172,7 +181,7 @@ private Object rowsToList() final Map theEvent = SelectQueryEngine.singleEvent( ScanResultValue.timestampKey, timestampColumnSelector, - dimSelectors, + selectorPlusList, metSelectors ); events.add(theEvent); diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index f9c7f62e2506..41597aab2bb8 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -64,7 +64,7 @@ public class SelectQueryEngine { private static final SelectStrategyFactory STRATEGY_FACTORY = new SelectStrategyFactory(); - private static class SelectStrategyFactory implements ColumnSelectorStrategyFactory + public static class SelectStrategyFactory implements ColumnSelectorStrategyFactory { @Override public SelectColumnSelectorStrategy makeColumnSelectorStrategy( @@ -205,7 +205,7 @@ public Result apply(Cursor cursor) final Map theEvent = singleEvent( EventHolder.timestampKey, timestampColumnSelector, - dimSelectors, + selectorPlusList, metSelectors ); @@ -229,7 +229,7 @@ public Result apply(Cursor cursor) public static Map singleEvent( String timestampKey, LongColumnSelector timestampColumnSelector, - Map dimSelectors, + List> selectorPlusList, Map metSelectors ) { diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index c946f5391375..b9861ee5e61f 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -255,19 +255,19 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE CountingOutputStream os = new CountingOutputStream(outputStream); jsonWriter.writeValue(os, yielder); - os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. - os.close(); - successfulQueryCount.incrementAndGet(); - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .setDimension("success", "true") - .build("query/time", queryTime) - ); - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .build("query/bytes", os.getCount()) - ); + os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. + os.close(); + successfulQueryCount.incrementAndGet(); + final long queryTime = System.currentTimeMillis() - start; + emitter.emit( + DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) + .setDimension("success", "true") + .build("query/time", queryTime) + ); + emitter.emit( + DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) + .build("query/bytes", os.getCount()) + ); requestLogger.log( new RequestLogLine( From 8d17ebb9342a7e44647db60770006c03bcaa4bdf Mon Sep 17 00:00:00 2001 From: kaijianding Date: Mon, 16 Jan 2017 00:50:18 +0800 Subject: [PATCH 09/12] add compactedList resultFormat, this format is better for json ser/der --- .../extensions-contrib/scan-query.md | 24 +++- .../java/io/druid/query/scan/ScanQuery.java | 1 + .../io/druid/query/scan/ScanQueryEngine.java | 45 +++++-- .../io/druid/query/scan/ScanResultValue.java | 8 +- .../druid/query/scan/ScanQueryRunnerTest.java | 113 ++++++++++++++++-- 5 files changed, 165 insertions(+), 26 deletions(-) diff --git a/docs/content/development/extensions-contrib/scan-query.md b/docs/content/development/extensions-contrib/scan-query.md index 457314c8d480..6cb06fcb8b5c 100644 --- a/docs/content/development/extensions-contrib/scan-query.md +++ b/docs/content/development/extensions-contrib/scan-query.md @@ -26,19 +26,20 @@ There are several main parts to a scan query: |queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes| |dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| -|resultFormat|How result represented, list or valueVector. Currently only `list` is supported. Default is `list`|no| +|resultFormat|How result represented, list or compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no| |filter|See [Filters](../querying/filters.html)|no| |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |batchSize|How many rows buffered before return to client. Default is `20480`|no| |limit|How many rows to return. If not specified, all rows will be returned.|no| |context|An additional JSON Object which can be used to specify certain flags.|no| -The format of the result is: +The format of the result when resultFormat equals to `list`: ```json [{ "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", "columns" : [ + "timestamp", "robot", "namespace", "anonymous", @@ -131,6 +132,25 @@ The format of the result is: } ] } ] ``` + +The format of the result when resultFormat equals to `compactedList`: + +```json + [{ + "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", + "columns" : [ + "timestamp", "robot", "namespace", "anonymous", "unpatrolled", "page", "language", "newpage", "user", "count", "added", "delta", "variation", "deleted" + ], + "events" : [ + ["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._73", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0], + ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._756", "en", "1", "MZMcBride", 1.0, 68.0, 68.0, 68.0, 0.0] + ] +} ] +``` + The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client. It will cause memory pressure if too many rows required by select query. Scan query doesn't have this issue. diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index c74e41ff2c39..cdd2d1da4c13 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -43,6 +43,7 @@ public class ScanQuery extends BaseQuery { public static final String SCAN = "scan"; public static final String RESULT_FORMAT_LIST = "list"; + public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList"; public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector"; private final String resultFormat; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 282195e4af88..647c6e96ba04 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -23,7 +23,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import io.druid.granularity.QueryGranularities; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.BaseSequence; @@ -50,7 +49,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; public class ScanQueryEngine { @@ -80,10 +78,22 @@ public Sequence process( List allDims = Lists.newLinkedList(adapter.getAvailableDimensions()); List allMetrics = Lists.newLinkedList(adapter.getAvailableMetrics()); + final List allColumns = Lists.newLinkedList(); if (query.getColumns() != null && !query.getColumns().isEmpty()) { + if (!query.getColumns().contains(ScanResultValue.timestampKey)) { + allColumns.add(ScanResultValue.timestampKey); + } + allColumns.addAll(query.getColumns()); allDims.retainAll(query.getColumns()); allMetrics.retainAll(query.getColumns()); } + if (query.getColumns() == null || query.getColumns().isEmpty()) { + if (!allDims.contains(ScanResultValue.timestampKey)) { + allColumns.add(ScanResultValue.timestampKey); + } + allColumns.addAll(allDims); + allColumns.addAll(allMetrics); + } final List dims = DefaultDimensionSpec.toSpec(allDims); final List metrics = allMetrics; @@ -119,7 +129,6 @@ public Sequence apply(final Cursor cursor) @Override public Iterator make() { - final Set columns = Sets.newHashSet(); final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); final List> selectorPlusList = Arrays.asList( @@ -129,15 +138,11 @@ public Iterator make() cursor ) ); - for (DimensionSpec dim : dims) { - columns.add(dim.getOutputName()); - } final Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); - columns.add(metric); } final int batchSize = query.getBatchSize(); return new Iterator() @@ -158,11 +163,13 @@ public ScanResultValue next() String resultFormat = query.getResultFormat(); if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) { throw new UnsupportedOperationException("valueVector is not supported now"); + } else if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) { + events = rowsToCompactedList(); } else { events = rowsToList(); } responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset)); - return new ScanResultValue(segmentId, columns, events); + return new ScanResultValue(segmentId, allColumns, events); } @Override @@ -171,11 +178,29 @@ public void remove() throw new UnsupportedOperationException(); } + private Object rowsToCompactedList() + { + return Lists.transform( + (List>) rowsToList(), + new Function, Object>() + { + @Override + public Object apply(Map input) + { + List eventValues = Lists.newArrayListWithCapacity(allColumns.size()); + for (String expectedColumn : allColumns) { + eventValues.add(input.get(expectedColumn)); + } + return eventValues; + } + } + ); + } + private Object rowsToList() { - int i = 0; List> events = Lists.newArrayListWithCapacity(batchSize); - for (; !cursor.isDone() + for (int i = 0; !cursor.isDone() && i < batchSize && offset < limit; cursor.advance(), i++, offset++) { final Map theEvent = SelectQueryEngine.singleEvent( diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java index 4a834663bb3b..76d8bf5dba97 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanResultValue.java @@ -21,20 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Set; +import java.util.List; public class ScanResultValue implements Comparable { public static final String timestampKey = "timestamp"; private final String segmentId; - private final Set columns; + private final List columns; private final Object events; @JsonCreator public ScanResultValue( @JsonProperty("segmentId") String segmentId, - @JsonProperty("columns") Set columns, + @JsonProperty("columns") List columns, @JsonProperty("events") Object events ) { @@ -50,7 +50,7 @@ public String getSegmentId() } @JsonProperty - public Set getColumns() + public List getColumns() { return columns; } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java index 74a2ceb1d9d2..e5277256c5e2 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/ScanQueryRunnerTest.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -129,6 +130,7 @@ private ScanQuery.ScanQueryBuilder newTestQuery() public void testFullOnSelect() { List columns = Lists.newArrayList( + ScanResultValue.timestampKey, "market", "quality", "placement", @@ -136,9 +138,9 @@ public void testFullOnSelect() "partial_null_column", "null_column", "index", - "quality_uniques", "indexMin", - "indexMaxPlusTen" + "indexMaxPlusTen", + "quality_uniques" ); ScanQuery query = newTestQuery() .intervals(I_0112_0114) @@ -159,6 +161,42 @@ public void testFullOnSelect() verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column")); } + @Test + public void testFullOnSelectAsCompactedList() + { + final List columns = Lists.newArrayList( + ScanResultValue.timestampKey, + "market", + "quality", + "placement", + "placementish", + "partial_null_column", + "null_column", + "index", + "indexMin", + "indexMaxPlusTen", + "quality_uniques" + ); + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toFullEvents(V_0112_0114), + columns, + 0, + 3 + ); + verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(compactedListToRow(results), "null_column")); + } + @Test public void testSelectWithDimsAndMets() { @@ -185,13 +223,47 @@ public void testSelectWithDimsAndMets() }, V_0112_0114 ), - Lists.newArrayList("market", "index"), + Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"), 0, 3 ); verify(expectedResults, results); } + @Test + public void testSelectWithDimsAndMetsAsCompactedList() + { + ScanQuery query = newTestQuery() + .intervals(I_0112_0114) + .columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric) + .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .build(); + + HashMap context = new HashMap(); + Iterable results = Sequences.toList( + runner.run(query, context), + Lists.newArrayList() + ); + + List expectedResults = toExpected( + toEvents( + new String[]{ + ScanResultValue.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + null, + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + V_0112_0114 + ), + Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"), + 0, + 3 + ); + verify(expectedResults, compactedListToRow(results)); + } + @Test public void testFullOnSelectWithFilterAndLimit() { @@ -246,7 +318,7 @@ public void testFullOnSelectWithFilterAndLimit() List expectedResults = toExpected( events, - Lists.newArrayList("quality", "index"), + Lists.newArrayList(ScanResultValue.timestampKey, "quality", "index"), 0, limit ); @@ -257,7 +329,6 @@ public void testFullOnSelectWithFilterAndLimit() @Test public void testSelectWithFilterLookupExtractionFn() { - Map extractionMap = new HashMap<>(); extractionMap.put("total_market", "replaced"); MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false); @@ -299,7 +370,7 @@ public void testSelectWithFilterLookupExtractionFn() List expectedResults = toExpected( events, - Lists.newArrayList(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric), + Lists.newArrayList(ScanResultValue.timestampKey, QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric), 0, 3 ); @@ -356,7 +427,7 @@ public void testFullSelectNoDimensionAndMetric() List expectedResults = toExpected( events, - Lists.newArrayList(), + Lists.newArrayList(ScanResultValue.timestampKey, "foo", "foo2"), 0, 3 ); @@ -440,7 +511,7 @@ private List toExpected( expected.add( new ScanResultValue( QueryRunnerTestHelper.segmentId, - Sets.newHashSet(columns), + columns, events ) ); @@ -462,7 +533,9 @@ private static void verify( Assert.assertEquals(expected.getSegmentId(), actual.getSegmentId()); - Assert.assertEquals(expected.getColumns(), actual.getColumns()); + Set exColumns = Sets.newTreeSet(expected.getColumns()); + Set acColumns = Sets.newTreeSet(actual.getColumns()); + Assert.assertEquals(exColumns, acColumns); Iterator> expectedEvts = ((List>) expected.getEvents()).iterator(); Iterator> actualEvts = ((List>) actual.getEvents()).iterator(); @@ -499,7 +572,7 @@ private static Iterable populateNullColumnAtLastForQueryableInd { // A Queryable index does not have the null column when it has loaded a index. for (ScanResultValue value : results) { - Set columns = value.getColumns(); + List columns = value.getColumns(); if (columns.contains(columnName)) { break; } @@ -509,4 +582,24 @@ private static Iterable populateNullColumnAtLastForQueryableInd return results; } + private Iterable compactedListToRow(Iterable results) { + return Iterables.transform(results, new Function() + { + @Override + public ScanResultValue apply(ScanResultValue input) + { + List mapEvents = Lists.newLinkedList(); + List events = ((List) input.getEvents()); + for (int i = 0; i < events.size(); i++) { + Iterator compactedEventIter = ((List) events.get(i)).iterator(); + Map mapEvent = new LinkedHashMap(); + for (String column : input.getColumns()) { + mapEvent.put(column, compactedEventIter.next()); + } + mapEvents.add(mapEvent); + } + return new ScanResultValue(input.getSegmentId(), input.getColumns(), mapEvents); + } + }); + } } From f098c260135a76ac80cdb940a069c9ee71cfd9e4 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Mon, 16 Jan 2017 10:45:24 +0800 Subject: [PATCH 10/12] respect query timeout --- .../development/extensions-contrib/scan-query.md | 5 ++--- .../main/java/io/druid/query/scan/ScanQueryEngine.java | 8 ++++++++ .../io/druid/query/scan/ScanQueryRunnerFactory.java | 10 ++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/docs/content/development/extensions-contrib/scan-query.md b/docs/content/development/extensions-contrib/scan-query.md index 6cb06fcb8b5c..23ed751e1a57 100644 --- a/docs/content/development/extensions-contrib/scan-query.md +++ b/docs/content/development/extensions-contrib/scan-query.md @@ -158,6 +158,5 @@ Scan query can return all rows without issuing another pagination query, which i ## Known issues 1. Scan query doesn't respect `limit` from broker side, currently only historical and realtime node respect `limit` -2. Scan query doesn't respect `timeout` config in query context. -3. Scan query can't be cancelled. -4. Scan query is running on qtp thread, thus it doesn't respect query priority. \ No newline at end of file +2. Scan query can't be cancelled. +3. Scan query is running on qtp thread, thus it doesn't respect query priority. \ No newline at end of file diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 647c6e96ba04..04f53e3181a8 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.QueryInterruptedException; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.Filter; @@ -49,6 +50,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; public class ScanQueryEngine { @@ -65,6 +67,8 @@ public Sequence process( return Sequences.empty(); } } + final Long timeoutAt = (long) responseContext.get("timeoutAt"); + final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); if (adapter == null) { @@ -158,6 +162,9 @@ public boolean hasNext() @Override public ScanResultValue next() { + if (System.currentTimeMillis() >= timeoutAt) { + throw new QueryInterruptedException(new TimeoutException()); + } int lastOffset = offset; Object events = null; String resultFormat = query.getResultFormat(); @@ -169,6 +176,7 @@ public ScanResultValue next() events = rowsToList(); } responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset)); + responseContext.put("timeoutAt", timeoutAt - (System.currentTimeMillis() - start)); return new ScanResultValue(segmentId, allColumns, events); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 012d448768c8..862a3cc0e98b 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -20,10 +20,12 @@ import com.google.common.base.Function; import com.google.inject.Inject; +import io.druid.common.utils.JodaUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; +import io.druid.query.QueryContextKeys; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -67,6 +69,10 @@ public Sequence run( final Query query, final Map responseContext ) { + final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); + final long timeoutAt = queryTimeout == null + ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); + responseContext.put("timeoutAt", timeoutAt); return Sequences.concat( Sequences.map( Sequences.simple(queryRunners), @@ -110,6 +116,10 @@ public Sequence run( throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class); } + // it happens in unit tests + if (responseContext.get("timeoutAt") == null) { + responseContext.put("timeoutAt", JodaUtils.MAX_INSTANT); + }; return engine.process((ScanQuery) query, segment, responseContext); } } From 4079396d2d666ee67b7e3a7d73f45b586de32bb0 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 17 Jan 2017 01:16:20 +0800 Subject: [PATCH 11/12] respect query limit on broker --- .../extensions-contrib/scan-query.md | 7 +- .../io/druid/query/scan/ScanQueryEngine.java | 6 +- .../query/scan/ScanQueryLimitRowIterator.java | 98 +++++++ .../query/scan/ScanQueryQueryToolChest.java | 28 +- .../query/scan/MultiSegmentScanQueryTest.java | 242 ++++++++++++++++++ 5 files changed, 368 insertions(+), 13 deletions(-) create mode 100644 extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java create mode 100644 extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java diff --git a/docs/content/development/extensions-contrib/scan-query.md b/docs/content/development/extensions-contrib/scan-query.md index 23ed751e1a57..3eef162b254e 100644 --- a/docs/content/development/extensions-contrib/scan-query.md +++ b/docs/content/development/extensions-contrib/scan-query.md @@ -154,9 +154,4 @@ The format of the result when resultFormat equals to `compactedList`: The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client. It will cause memory pressure if too many rows required by select query. Scan query doesn't have this issue. -Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly. - -## Known issues -1. Scan query doesn't respect `limit` from broker side, currently only historical and realtime node respect `limit` -2. Scan query can't be cancelled. -3. Scan query is running on qtp thread, thus it doesn't respect query priority. \ No newline at end of file +Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly. \ No newline at end of file diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 04f53e3181a8..27aeccb0b173 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -43,7 +43,6 @@ import io.druid.segment.VirtualColumns; import io.druid.segment.column.Column; import io.druid.segment.filter.Filters; -import io.druid.timeline.DataSegmentUtils; import org.joda.time.Interval; import java.util.Arrays; @@ -104,8 +103,7 @@ public Sequence process( final List intervals = query.getQuerySegmentSpec().getIntervals(); Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals); - // should be rewritten with given interval - final String segmentId = DataSegmentUtils.withInterval(dataSource, segment.getIdentifier(), intervals.get(0)); + final String segmentId = segment.getIdentifier(); final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); @@ -195,7 +193,7 @@ private Object rowsToCompactedList() @Override public Object apply(Map input) { - List eventValues = Lists.newArrayListWithCapacity(allColumns.size()); + List eventValues = Lists.newArrayListWithExpectedSize(allColumns.size()); for (String expectedColumn : allColumns) { eventValues.add(input.get(expectedColumn)); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java new file mode 100644 index 000000000000..78a11073f55e --- /dev/null +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java @@ -0,0 +1,98 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.scan; + +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Yielder; +import io.druid.java.util.common.guava.YieldingAccumulator; +import io.druid.java.util.common.parsers.CloseableIterator; +import io.druid.query.QueryRunner; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ScanQueryLimitRowIterator implements CloseableIterator +{ + private Yielder yielder; + private String resultFormat; + private int limit = 0; + private int count = 0; + + public ScanQueryLimitRowIterator( + QueryRunner baseRunner, ScanQuery query, + Map responseContext + ) + { + resultFormat = query.getResultFormat(); + limit = query.getLimit(); + Sequence baseSequence = baseRunner.run(query, responseContext); + yielder = baseSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasNext() + { + return !yielder.isDone() && count < limit; + } + + @Override + public ScanResultValue next() + { + ScanResultValue batch = yielder.get(); + if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) || + ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) { + List events = (List) batch.getEvents(); + if (events.size() <= limit - count) { + count += events.size(); + yielder = yielder.next(null); + return batch; + } else { + // last batch + int left = limit - count; + count = limit; + return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left)); + } + } + throw new UnsupportedOperationException(ScanQuery.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet"); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException + { + yielder.close(); + } +} diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 052a83b48cea..222efda0a8e1 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -22,6 +22,8 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.java.util.common.guava.BaseSequence; +import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; import io.druid.query.DruidMetrics; import io.druid.query.Query; @@ -43,10 +45,30 @@ public QueryRunner mergeResults(final QueryRunner() { @Override - public Sequence run(final Query input, Map responseContext) + public Sequence run( + final Query query, final Map responseContext + ) { - // no actually merge - return runner.run(input, responseContext); + ScanQuery scanQuery = (ScanQuery) query; + if (scanQuery.getLimit() == Integer.MAX_VALUE) { + return runner.run(query, responseContext); + } + return new BaseSequence<>( + new BaseSequence.IteratorMaker() + { + @Override + public ScanQueryLimitRowIterator make() + { + return new ScanQueryLimitRowIterator(runner, (ScanQuery) query, responseContext); + } + + @Override + public void cleanup(ScanQueryLimitRowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + } + ); } }; } diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java new file mode 100644 index 000000000000..f2fbf0f5f41e --- /dev/null +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.CharSource; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.MergeSequence; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.TableDataSource; +import io.druid.query.select.SelectQueryRunnerTest; +import io.druid.segment.IncrementalIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.TestIndex; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +@RunWith(Parameterized.class) +public class MultiSegmentScanQueryTest +{ + private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(); + + private static final QueryRunnerFactory factory = new ScanQueryRunnerFactory( + toolChest, + new ScanQueryEngine() + ); + + // time modified version of druid.sample.tsv + public static final String[] V_0112 = { + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T01:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T02:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T03:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T04:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T05:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T06:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T07:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T08:00:00.000Z spot travel preferred tpreferred 100.000000", + "2011-01-12T09:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T10:00:00.000Z total_market premium preferred ppreferred 1000.000000", + "2011-01-12T11:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value", + "2011-01-12T12:00:00.000Z upfront premium preferred ppreferred 800.000000 value", + "2011-01-12T13:00:00.000Z upfront premium preferred ppreferred2 800.000000 value" + }; + public static final String[] V_0113 = { + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T01:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T02:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T03:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T04:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T05:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T06:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T07:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T08:00:00.000Z spot travel preferred tpreferred 106.236928", + "2011-01-13T09:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T10:00:00.000Z total_market premium preferred ppreferred 1689.012875", + "2011-01-13T11:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value", + "2011-01-13T12:00:00.000Z upfront premium preferred ppreferred 1564.617729 value" + }; + + private static Segment segment0; + private static Segment segment1; + + @BeforeClass + public static void setup() throws IOException + { + CharSource v_0112 = CharSource.wrap(StringUtils.join(V_0112, "\n")); + CharSource v_0113 = CharSource.wrap(StringUtils.join(V_0113, "\n")); + + IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIndex("2011-01-12T00:00:00.000Z"), v_0112); + IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIndex("2011-01-13T00:00:00.000Z"), v_0113); + + segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1")); + segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1")); + } + + private static String makeIdentifier(IncrementalIndex index, String version) + { + return makeIdentifier(index.getInterval(), version); + } + + private static String makeIdentifier(Interval interval, String version) + { + return DataSegment.makeDataSegmentIdentifier( + QueryRunnerTestHelper.dataSource, + interval.getStart(), + interval.getEnd(), + version, + NoneShardSpec.instance() + ); + } + + private static IncrementalIndex newIndex(String minTimeStamp) + { + return newIndex(minTimeStamp, 10000); + } + + private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount) + { + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime(minTimeStamp).getMillis()) + .withQueryGranularity(QueryGranularities.HOUR) + .withMetrics(TestIndex.METRIC_AGGS) + .build(); + return new OnheapIncrementalIndex(schema, true, maxRowCount); + } + + @AfterClass + public static void clear() + { + IOUtils.closeQuietly(segment0); + IOUtils.closeQuietly(segment1); + } + + @Parameterized.Parameters(name = "limit={0},batchSize={1}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian(Arrays.asList(0, 1, 3, 7, 10, 20, 1000), Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000)); + } + + private final int limit; + private final int batchSize; + + public MultiSegmentScanQueryTest(int limit, int batchSize) + { + this.limit = limit; + this.batchSize = batchSize; + } + + private ScanQuery.ScanQueryBuilder newBuilder() + { + return ScanQuery.newScanQueryBuilder() + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .intervals(SelectQueryRunnerTest.I_0112_0114) + .batchSize(batchSize) + .columns(Arrays.asList()) + .limit(limit); + } + + @Test + public void testMergeRunnersWithLimit() + { + ScanQuery query = newBuilder().build(); + List results = Sequences.toList( + factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of( + factory.createRunner(segment0), + factory.createRunner(segment1) + )).run(query, new HashMap()), + Lists.newArrayList() + ); + int totalCount = 0; + for (ScanResultValue result : results) { + System.out.println(((List) result.getEvents()).size()); + totalCount += ((List) result.getEvents()).size(); + } + Assert.assertEquals( + totalCount, + limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length + ); + } + + @Test + public void testMergeResultsWithLimit() + { + QueryRunner runner = toolChest.mergeResults( + new QueryRunner() { + @Override + public Sequence run( + Query query, Map responseContext + ) + { + // simulate results back from 2 historicals + List> sequences = Lists.newArrayListWithExpectedSize(2); + sequences.add(factory.createRunner(segment0).run(query, new HashMap())); + sequences.add(factory.createRunner(segment1).run(query, new HashMap())); + return new MergeSequence<>( + query.getResultOrdering(), + Sequences.simple(sequences) + ); + } + } + ); + ScanQuery query = newBuilder().build(); + List results = Sequences.toList( + runner.run(query, new HashMap()), + Lists.newArrayList() + ); + int totalCount = 0; + for (ScanResultValue result : results) { + totalCount += ((List) result.getEvents()).size(); + } + Assert.assertEquals( + totalCount, + limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length + ); + } +} From 06253292cf542a064f966c50f6f66f85b8610fcb Mon Sep 17 00:00:00 2001 From: kaijianding Date: Thu, 19 Jan 2017 15:16:29 +0800 Subject: [PATCH 12/12] use static consts and remove unused code --- .../io/druid/query/scan/ScanQueryEngine.java | 28 ++++++++++--------- .../query/scan/ScanQueryRunnerFactory.java | 8 ++++-- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 27aeccb0b173..5b04bb19a8d6 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -20,7 +20,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.druid.granularity.QueryGranularities; @@ -60,13 +59,13 @@ public Sequence process( final Map responseContext ) { - if (responseContext.get("count") != null) { - int count = (int) responseContext.get("count"); + if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) { + int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); if (count >= query.getLimit()) { return Sequences.empty(); } } - final Long timeoutAt = (long) responseContext.get("timeoutAt"); + final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT); final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); @@ -76,9 +75,6 @@ public Sequence process( ); } - // at the point where this code is called, only one datasource should exist. - String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); - List allDims = Lists.newLinkedList(adapter.getAvailableDimensions()); List allMetrics = Lists.newLinkedList(adapter.getAvailableMetrics()); final List allColumns = Lists.newLinkedList(); @@ -90,7 +86,7 @@ public Sequence process( allDims.retainAll(query.getColumns()); allMetrics.retainAll(query.getColumns()); } - if (query.getColumns() == null || query.getColumns().isEmpty()) { + else { if (!allDims.contains(ScanResultValue.timestampKey)) { allColumns.add(ScanResultValue.timestampKey); } @@ -107,10 +103,10 @@ public Sequence process( final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); - if (responseContext.get("count") == null) { - responseContext.put("count", 0); + if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) { + responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0); } - final int limit = query.getLimit() - (int) responseContext.get("count"); + final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); return Sequences.concat( Sequences.map( adapter.makeCursors( @@ -173,8 +169,14 @@ public ScanResultValue next() } else { events = rowsToList(); } - responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset)); - responseContext.put("timeoutAt", timeoutAt - (System.currentTimeMillis() - start)); + responseContext.put( + ScanQueryRunnerFactory.CTX_COUNT, + (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) + ); + responseContext.put( + ScanQueryRunnerFactory.CTX_TIMEOUT_AT, + timeoutAt - (System.currentTimeMillis() - start) + ); return new ScanResultValue(segmentId, allColumns, events); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 862a3cc0e98b..6b1244c5ff8a 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -36,6 +36,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { + public static final String CTX_TIMEOUT_AT = "timeoutAt"; + public static final String CTX_COUNT = "count"; private final ScanQueryQueryToolChest toolChest; private final ScanQueryEngine engine; @@ -72,7 +74,7 @@ public Sequence run( final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); final long timeoutAt = queryTimeout == null ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); - responseContext.put("timeoutAt", timeoutAt); + responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( Sequences.map( Sequences.simple(queryRunners), @@ -117,8 +119,8 @@ public Sequence run( } // it happens in unit tests - if (responseContext.get("timeoutAt") == null) { - responseContext.put("timeoutAt", JodaUtils.MAX_INSTANT); + if (responseContext.get(CTX_TIMEOUT_AT) == null) { + responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT); }; return engine.process((ScanQuery) query, segment, responseContext); }