From 50060576e60812f04f462fb42678aec78611a190 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 18 May 2017 17:45:57 -0700 Subject: [PATCH 1/4] Timeout and maxScatterGatherBytes handling for queries run by Druid SQL --- .../druid/benchmark/query/SqlBenchmark.java | 4 +- .../sql/QuantileSqlAggregatorTest.java | 3 +- .../sql/calcite/planner/PlannerFactory.java | 8 ++- .../io/druid/sql/calcite/rel/QueryMaker.java | 69 ++++++++++++++++--- .../druid/sql/calcite/schema/DruidSchema.java | 15 +++- .../sql/avatica/DruidAvaticaHandlerTest.java | 6 +- .../druid/sql/avatica/DruidStatementTest.java | 3 +- .../druid/sql/calcite/CalciteQueryTest.java | 9 ++- .../sql/calcite/http/SqlResourceTest.java | 6 +- .../sql/calcite/schema/DruidSchemaTest.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 4 +- 11 files changed, 108 insertions(+), 23 deletions(-) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java index 07d4daeb8293..6a38eec88e0e 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java @@ -47,6 +47,7 @@ import io.druid.segment.TestHelper; import io.druid.segment.column.ValueType; import io.druid.segment.serde.ComplexMetrics; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidPlanner; import io.druid.sql.calcite.planner.PlannerConfig; @@ -177,7 +178,8 @@ protected Map getTableMap() Calcites.createRootSchema(druidSchema), walker, CalciteTests.createOperatorTable(), - plannerConfig + plannerConfig, + new ServerConfig() ); groupByQuery = GroupByQuery .builder() diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index e4eb972d320a..fde21218b3eb 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -42,6 +42,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.SqlExtractionOperator; import io.druid.sql.calcite.filtration.Filtration; @@ -134,7 +135,7 @@ public void setUp() throws Exception ImmutableSet.of(new QuantileSqlAggregator()), ImmutableSet.of() ); - plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); + plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()); } @After diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java index 85dba6686a0a..e51c3ca43b20 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import io.druid.query.QuerySegmentWalker; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.rel.QueryMaker; import io.druid.sql.calcite.schema.DruidSchema; import org.apache.calcite.avatica.util.Casing; @@ -44,25 +45,28 @@ public class PlannerFactory private final QuerySegmentWalker walker; private final DruidOperatorTable operatorTable; private final PlannerConfig plannerConfig; + private final ServerConfig serverConfig; @Inject public PlannerFactory( final SchemaPlus rootSchema, final QuerySegmentWalker walker, final DruidOperatorTable operatorTable, - final PlannerConfig plannerConfig + final PlannerConfig plannerConfig, + final ServerConfig serverConfig ) { this.rootSchema = rootSchema; this.walker = walker; this.operatorTable = operatorTable; this.plannerConfig = plannerConfig; + this.serverConfig = serverConfig; } public DruidPlanner createPlanner(final Map queryContext) { final PlannerContext plannerContext = PlannerContext.create(plannerConfig, queryContext); - final QueryMaker queryMaker = new QueryMaker(walker, plannerContext); + final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig); final FrameworkConfig frameworkConfig = Frameworks .newConfigBuilder() .parserConfig( diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index c0eba66180d7..bb7cca51dc24 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -23,14 +23,17 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; +import com.google.common.collect.MapMaker; import com.google.common.primitives.Doubles; import com.google.common.primitives.Ints; +import io.druid.client.DirectDruidClient; import io.druid.common.guava.GuavaUtils; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DataSource; +import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; @@ -47,6 +50,7 @@ import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNResultValue; import io.druid.segment.column.Column; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; @@ -69,14 +73,17 @@ public class QueryMaker { private final QuerySegmentWalker walker; private final PlannerContext plannerContext; + private final ServerConfig serverConfig; public QueryMaker( final QuerySegmentWalker walker, - final PlannerContext plannerContext + final PlannerContext plannerContext, + final ServerConfig serverConfig ) { this.walker = walker; this.plannerContext = plannerContext; + this.serverConfig = serverConfig; } public PlannerContext getPlannerContext() @@ -179,7 +186,7 @@ public boolean hasNext() @Override public Sequence next() { - final SelectQuery queryWithPagination = baseQuery.withPagingSpec( + SelectQuery queryWithPagination = baseQuery.withPagingSpec( new PagingSpec( pagingIdentifiers.get(), plannerContext.getPlannerConfig().getSelectThreshold(), @@ -192,9 +199,15 @@ public Sequence next() morePages.set(false); final AtomicBoolean gotResult = new AtomicBoolean(); + queryWithPagination = (SelectQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( + queryWithPagination, + serverConfig + ); + return Sequences.concat( Sequences.map( - QueryPlus.wrap(queryWithPagination).run(walker, Maps.newHashMap()), + QueryPlus.wrap(queryWithPagination) + .run(walker, makeResponseContextForQuery(queryWithPagination)), new Function, Sequence>() { @Override @@ -255,7 +268,7 @@ public void remove() private Sequence executeTimeseries( final DruidQueryBuilder queryBuilder, - final TimeseriesQuery query + TimeseriesQuery query ) { final List fieldList = queryBuilder.getRowType().getFieldList(); @@ -264,8 +277,13 @@ private Sequence executeTimeseries( Hook.QUERY_PLAN.run(query); + query = (TimeseriesQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( + query, + serverConfig + ); + return Sequences.map( - QueryPlus.wrap(query).run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, makeResponseContextForQuery(query)), new Function, Object[]>() { @Override @@ -291,16 +309,21 @@ public Object[] apply(final Result result) private Sequence executeTopN( final DruidQueryBuilder queryBuilder, - final TopNQuery query + TopNQuery query ) { final List fieldList = queryBuilder.getRowType().getFieldList(); Hook.QUERY_PLAN.run(query); + query = (TopNQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( + query, + serverConfig + ); + return Sequences.concat( Sequences.map( - QueryPlus.wrap(query).run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, makeResponseContextForQuery(query)), new Function, Sequence>() { @Override @@ -328,15 +351,20 @@ public Sequence apply(final Result result) private Sequence executeGroupBy( final DruidQueryBuilder queryBuilder, - final GroupByQuery query + GroupByQuery query ) { final List fieldList = queryBuilder.getRowType().getFieldList(); Hook.QUERY_PLAN.run(query); + query = (GroupByQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( + query, + serverConfig + ); + return Sequences.map( - QueryPlus.wrap(query).run(walker, Maps.newHashMap()), + QueryPlus.wrap(query).run(walker, makeResponseContextForQuery(query)), new Function() { @Override @@ -355,6 +383,27 @@ public Object[] apply(final io.druid.data.input.Row row) ); } + public static Query withDefaultTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig serverConfig) + { + query = QueryContexts.withDefaultTimeout(query, serverConfig.getDefaultQueryTimeout()); + query = QueryContexts.withMaxScatterGatherBytes(query, serverConfig.getMaxScatterGatherBytes()); + return query; + } + + public static Map makeResponseContextForQuery(Query query) + { + final Map responseContext = new MapMaker().makeMap(); + responseContext.put( + DirectDruidClient.QUERY_FAIL_TIME, + System.currentTimeMillis() + QueryContexts.getTimeout(query) + ); + responseContext.put( + DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, + new AtomicLong() + ); + return responseContext; + } + public static ColumnMetaData.Rep rep(final SqlTypeName sqlType) { if (SqlTypeName.CHAR_TYPES.contains(sqlType)) { diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index e5cd0da46c19..88f908728224 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -50,7 +50,9 @@ import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.segment.column.ValueType; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.PlannerConfig; +import io.druid.sql.calcite.rel.QueryMaker; import io.druid.sql.calcite.table.DruidTable; import io.druid.sql.calcite.table.RowSignature; import io.druid.sql.calcite.view.DruidViewMacro; @@ -82,6 +84,7 @@ public class DruidSchema extends AbstractSchema private final ViewManager viewManager; private final ExecutorService cacheExec; private final ConcurrentMap tables; + private final ServerConfig serverConfig; // For awaitInitialization. private final CountDownLatch initializationLatch = new CountDownLatch(1); @@ -100,7 +103,8 @@ public DruidSchema( final QuerySegmentWalker walker, final TimelineServerView serverView, final PlannerConfig config, - final ViewManager viewManager + final ViewManager viewManager, + final ServerConfig serverConfig ) { this.walker = Preconditions.checkNotNull(walker, "walker"); @@ -109,6 +113,7 @@ public DruidSchema( this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d"); this.tables = Maps.newConcurrentMap(); + this.serverConfig = serverConfig; } @LifecycleStart @@ -295,7 +300,7 @@ protected Multimap getFunctionMultimap() private DruidTable computeTable(final String dataSource) { - final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( + SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( new TableDataSource(dataSource), null, null, @@ -306,7 +311,11 @@ private DruidTable computeTable(final String dataSource) true ); - final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery).run(walker, Maps.newHashMap()); + final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery) + .run( + walker, + QueryMaker.makeResponseContextForQuery(segmentMetadataQuery) + ); final List results = Sequences.toList(sequence, Lists.newArrayList()); if (results.isEmpty()) { return null; diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java index 52a5bda8a1d6..f2e7432e4629 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -32,6 +32,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.druid.java.util.common.Pair; import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -116,7 +117,10 @@ public void setUp() throws Exception ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final DruidAvaticaHandler handler = new DruidAvaticaHandler( - new DruidMeta(new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig), AVATICA_CONFIG), + new DruidMeta( + new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()), + AVATICA_CONFIG + ), new DruidNode("dummy", "dummy", 1), new AvaticaMonitor() ); diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java index 1f0ee59c4419..6cea2ff2e5fe 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -65,7 +66,7 @@ public void setUp() throws Exception ) ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); + plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()); } @After diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 1ef1fe6d2097..f1678b43ac09 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -81,6 +81,7 @@ import io.druid.query.topn.TopNQueryBuilder; import io.druid.segment.column.Column; import io.druid.segment.column.ValueType; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -4223,7 +4224,13 @@ private List getResults( final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - final PlannerFactory plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); + final PlannerFactory plannerFactory = new PlannerFactory( + rootSchema, + walker, + operatorTable, + plannerConfig, + new ServerConfig() + ); viewManager.createView( plannerFactory, diff --git a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java index 392d19fbf748..88492861d0c9 100644 --- a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.Pair; import io.druid.query.QueryInterruptedException; import io.druid.query.ResourceLimitExceededException; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -77,7 +78,10 @@ public void setUp() throws Exception CalciteTests.createMockSchema(walker, plannerConfig) ); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); - resource = new SqlResource(JSON_MAPPER, new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig)); + resource = new SqlResource( + JSON_MAPPER, + new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig, new ServerConfig()) + ); } @After diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java index 0d368186fc7a..982c5badba62 100644 --- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java @@ -32,6 +32,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.table.DruidTable; @@ -150,7 +151,8 @@ public void setUp() throws Exception walker, new TestServerInventoryView(walker.getSegments()), PLANNER_CONFIG_DEFAULT, - new NoopViewManager() + new NoopViewManager(), + new ServerConfig() ); schema.start(); diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index e3096d3a3b6e..bf8355058923 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -76,6 +76,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.SqlExtractionOperator; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -380,7 +381,8 @@ public static DruidSchema createMockSchema( walker, new TestServerInventoryView(walker.getSegments()), plannerConfig, - viewManager + viewManager, + new ServerConfig() ); schema.start(); From 0fe81d7360842c0aeb79119013fbbc21ffebd3e2 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 22 May 2017 20:15:49 -0700 Subject: [PATCH 2/4] Address PR comments --- .../io/druid/client/DirectDruidClient.java | 27 +++++ .../java/io/druid/server/QueryResource.java | 13 +-- .../sql/calcite/planner/PlannerContext.java | 7 ++ .../io/druid/sql/calcite/rel/QueryMaker.java | 106 ++++++++---------- .../druid/sql/calcite/schema/DruidSchema.java | 12 +- 5 files changed, 96 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 34d656964994..cc1bbd8dba2f 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Charsets; import com.google.common.base.Throwables; +import com.google.common.collect.MapMaker; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.FutureCallback; @@ -64,6 +65,7 @@ import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.server.initialization.ServerConfig; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -114,6 +116,31 @@ public class DirectDruidClient implements QueryRunner private final AtomicInteger openConnections; private final boolean isSmile; + public static > QueryType withDefaultTimeoutAndMaxScatterGatherBytes(final QueryType query, ServerConfig serverConfig) + { + return (QueryType) QueryContexts.withMaxScatterGatherBytes( + QueryContexts.withDefaultTimeout( + (Query) query, + serverConfig.getDefaultQueryTimeout() + ), + serverConfig.getMaxScatterGatherBytes() + ); + } + + public static Map makeResponseContextForQuery(Query query, long startTimeMillis) + { + final Map responseContext = new MapMaker().makeMap(); + responseContext.put( + DirectDruidClient.QUERY_FAIL_TIME, + startTimeMillis + QueryContexts.getTimeout(query) + ); + responseContext.put( + DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, + new AtomicLong() + ); + return responseContext; + } + public DirectDruidClient( QueryToolChestWarehouse warehouse, QueryWatcher queryWatcher, diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 40aa8d52bc69..2ff2766b6c39 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.MapMaker; import com.google.common.io.CountingOutputStream; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; @@ -41,7 +40,6 @@ import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; -import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QueryPlus; @@ -187,7 +185,6 @@ public Response doPost( final String currThreadName = Thread.currentThread().getName(); try { - final Map responseContext = new MapMaker().makeMap(); query = context.getObjectMapper().readValue(in, Query.class); queryId = query.getId(); @@ -195,14 +192,12 @@ public Response doPost( queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } - query = QueryContexts.withDefaultTimeout(query, config.getDefaultQueryTimeout()); - query = QueryContexts.withMaxScatterGatherBytes(query, config.getMaxScatterGatherBytes()); - responseContext.put( - DirectDruidClient.QUERY_FAIL_TIME, - System.currentTimeMillis() + QueryContexts.getTimeout(query) + query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(query, config); + final Map responseContext = DirectDruidClient.makeResponseContextForQuery( + query, + System.currentTimeMillis() ); - responseContext.put(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong()); toolChest = warehouse.getToolChest(query); diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java index 9927412e63a7..ac4f84b6423c 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java @@ -42,6 +42,7 @@ public class PlannerContext private final PlannerConfig plannerConfig; private final DateTime localNow; + private final long queryStartTimeMillis; private final Map queryContext; private PlannerContext( @@ -53,6 +54,7 @@ private PlannerContext( this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig"); this.queryContext = queryContext != null ? ImmutableMap.copyOf(queryContext) : ImmutableMap.of(); this.localNow = Preconditions.checkNotNull(localNow, "localNow"); + this.queryStartTimeMillis = System.currentTimeMillis(); } public static PlannerContext create( @@ -106,6 +108,11 @@ public Map getQueryContext() return queryContext; } + public long getQueryStartTimeMillis() + { + return queryStartTimeMillis; + } + public DataContext createDataContext(final JavaTypeFactory typeFactory) { class DruidDataContext implements DataContext diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index bb7cca51dc24..5be2b554cea4 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; -import com.google.common.collect.MapMaker; import com.google.common.primitives.Doubles; import com.google.common.primitives.Ints; import io.druid.client.DirectDruidClient; @@ -32,8 +31,6 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DataSource; -import io.druid.query.Query; -import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryPlus; import io.druid.query.QuerySegmentWalker; @@ -186,12 +183,15 @@ public boolean hasNext() @Override public Sequence next() { - SelectQuery queryWithPagination = baseQuery.withPagingSpec( - new PagingSpec( - pagingIdentifiers.get(), - plannerContext.getPlannerConfig().getSelectThreshold(), - true - ) + final SelectQuery queryWithPagination = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery.withPagingSpec( + new PagingSpec( + pagingIdentifiers.get(), + plannerContext.getPlannerConfig().getSelectThreshold(), + true + ) + ), + serverConfig ); Hook.QUERY_PLAN.run(queryWithPagination); @@ -199,15 +199,15 @@ public Sequence next() morePages.set(false); final AtomicBoolean gotResult = new AtomicBoolean(); - queryWithPagination = (SelectQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( - queryWithPagination, - serverConfig - ); - return Sequences.concat( Sequences.map( QueryPlus.wrap(queryWithPagination) - .run(walker, makeResponseContextForQuery(queryWithPagination)), + .run(walker, + DirectDruidClient.makeResponseContextForQuery( + queryWithPagination, + plannerContext.getQueryStartTimeMillis() + ) + ), new Function, Sequence>() { @Override @@ -268,22 +268,26 @@ public void remove() private Sequence executeTimeseries( final DruidQueryBuilder queryBuilder, - TimeseriesQuery query + final TimeseriesQuery baseQuery ) { + final TimeseriesQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery, + serverConfig + ); + final List fieldList = queryBuilder.getRowType().getFieldList(); final List dimensions = queryBuilder.getGrouping().getDimensions(); final String timeOutputName = dimensions.isEmpty() ? null : Iterables.getOnlyElement(dimensions).getOutputName(); Hook.QUERY_PLAN.run(query); - query = (TimeseriesQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( - query, - serverConfig - ); - return Sequences.map( - QueryPlus.wrap(query).run(walker, makeResponseContextForQuery(query)), + QueryPlus.wrap(query) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) + ), new Function, Object[]>() { @Override @@ -309,21 +313,25 @@ public Object[] apply(final Result result) private Sequence executeTopN( final DruidQueryBuilder queryBuilder, - TopNQuery query + final TopNQuery baseQuery ) { + final TopNQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery, + serverConfig + ); + final List fieldList = queryBuilder.getRowType().getFieldList(); Hook.QUERY_PLAN.run(query); - query = (TopNQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( - query, - serverConfig - ); - return Sequences.concat( Sequences.map( - QueryPlus.wrap(query).run(walker, makeResponseContextForQuery(query)), + QueryPlus.wrap(query) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) + ), new Function, Sequence>() { @Override @@ -351,20 +359,23 @@ public Sequence apply(final Result result) private Sequence executeGroupBy( final DruidQueryBuilder queryBuilder, - GroupByQuery query + final GroupByQuery baseQuery ) { - final List fieldList = queryBuilder.getRowType().getFieldList(); - - Hook.QUERY_PLAN.run(query); - - query = (GroupByQuery) QueryMaker.withDefaultTimeoutAndMaxScatterGatherBytes( - query, + final GroupByQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery, serverConfig ); + final List fieldList = queryBuilder.getRowType().getFieldList(); + + Hook.QUERY_PLAN.run(query); return Sequences.map( - QueryPlus.wrap(query).run(walker, makeResponseContextForQuery(query)), + QueryPlus.wrap(query) + .run( + walker, + DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) + ), new Function() { @Override @@ -383,27 +394,6 @@ public Object[] apply(final io.druid.data.input.Row row) ); } - public static Query withDefaultTimeoutAndMaxScatterGatherBytes(Query query, ServerConfig serverConfig) - { - query = QueryContexts.withDefaultTimeout(query, serverConfig.getDefaultQueryTimeout()); - query = QueryContexts.withMaxScatterGatherBytes(query, serverConfig.getMaxScatterGatherBytes()); - return query; - } - - public static Map makeResponseContextForQuery(Query query) - { - final Map responseContext = new MapMaker().makeMap(); - responseContext.put( - DirectDruidClient.QUERY_FAIL_TIME, - System.currentTimeMillis() + QueryContexts.getTimeout(query) - ); - responseContext.put( - DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED, - new AtomicLong() - ); - return responseContext; - } - public static ColumnMetaData.Rep rep(final SqlTypeName sqlType) { if (SqlTypeName.CHAR_TYPES.contains(sqlType)) { diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index 88f908728224..d54a07f2f038 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import io.druid.client.DirectDruidClient; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.ServerView; @@ -52,7 +53,6 @@ import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.PlannerConfig; -import io.druid.sql.calcite.rel.QueryMaker; import io.druid.sql.calcite.table.DruidTable; import io.druid.sql.calcite.table.RowSignature; import io.druid.sql.calcite.view.DruidViewMacro; @@ -311,10 +311,18 @@ private DruidTable computeTable(final String dataSource) true ); + segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + segmentMetadataQuery, + serverConfig + ); + final Sequence sequence = QueryPlus.wrap(segmentMetadataQuery) .run( walker, - QueryMaker.makeResponseContextForQuery(segmentMetadataQuery) + DirectDruidClient.makeResponseContextForQuery( + segmentMetadataQuery, + System.currentTimeMillis() + ) ); final List results = Sequences.toList(sequence, Lists.newArrayList()); if (results.isEmpty()) { From 2d358b74c745023a2ff1649ad3d53212c7d26966 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 22 May 2017 23:38:08 -0700 Subject: [PATCH 3/4] Fix contexts in CalciteQueryTest --- .../druid/sql/calcite/CalciteQueryTest.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index f1678b43ac09..4e4afd6e8fa0 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -29,6 +29,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -170,35 +171,47 @@ public int getMaxQueryCount() private static final String LOS_ANGELES = "America/Los_Angeles"; private static final Map QUERY_CONTEXT_DEFAULT = ImmutableMap.of( - PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z" + PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final Map QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - "skipEmptyBuckets", false + "skipEmptyBuckets", false, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final Map QUERY_CONTEXT_NO_TOPN = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false" + PlannerConfig.CTX_KEY_USE_APPROXIMATE_TOPN, "false", + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final Map QUERY_CONTEXT_LOS_ANGELES = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES + PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); // Matches QUERY_CONTEXT_DEFAULT public static final Map TIMESERIES_CONTEXT_DEFAULT = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", - "skipEmptyBuckets", true + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); // Matches QUERY_CONTEXT_LOS_ANGELES public static final Map TIMESERIES_CONTEXT_LOS_ANGELES = ImmutableMap.of( PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z", PlannerContext.CTX_SQL_TIME_ZONE, LOS_ANGELES, - "skipEmptyBuckets", true + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE ); private static final PagingSpec FIRST_PAGING_SPEC = new PagingSpec(null, 1000, true); From 72b65a1e4887e14c3cf4d8fb86ee6795baaf1de8 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 23 May 2017 00:11:14 -0700 Subject: [PATCH 4/4] Fix contexts in QuantileSqlAggregatorTest --- .../histogram/sql/QuantileSqlAggregatorTest.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index fde21218b3eb..6ccebebaeee6 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -26,6 +26,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; +import io.druid.query.QueryContexts; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -201,7 +202,11 @@ public void testQuantileOnFloatAndLongs() throws Exception new QuantilePostAggregator("a6", "a4:agg", 0.999f), new QuantilePostAggregator("a7", "a7:agg", 0.50f) )) - .context(ImmutableMap.of("skipEmptyBuckets", true)) + .context(ImmutableMap.of( + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE + )) .build(), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); @@ -261,7 +266,11 @@ public void testQuantileOnComplexColumn() throws Exception new QuantilePostAggregator("a5", "a5:agg", 0.999f), new QuantilePostAggregator("a6", "a4:agg", 0.999f) )) - .context(ImmutableMap.of("skipEmptyBuckets", true)) + .context(ImmutableMap.of( + "skipEmptyBuckets", true, + QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE + )) .build(), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) );