From a461c12cdba8aa369171dcb6ea9224df4e536384 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 8 Feb 2017 16:10:18 -0800 Subject: [PATCH 1/2] SQL: Add context and contextual functions to planner. Added support for context parameters specified as JDBC connection properties or a JSON object for SQL-over-JSON-over-HTTP. Also added features that depend on context functionality: - Added CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP functions. - Added support for time zones other than UTC via a "timeZone" context. - Pass down query context to Druid queries too. Also some bug fixes: - Fix DATE handling, it was largely done incorrectly before. - Fix CAST(__time TO DATE) which should do a floor-to-day. - Fix non-equality comparisons to FLOOR(__time TO X). - Fix maxQueryCount property. --- .../druid/benchmark/query/SqlBenchmark.java | 9 +- docs/content/querying/query-context.md | 21 +- docs/content/querying/sql.md | 56 +- .../histogram/sql/QuantileSqlAggregator.java | 3 + .../sql/QuantileSqlAggregatorTest.java | 18 +- .../io/druid/sql/avatica/DruidConnection.java | 11 +- .../java/io/druid/sql/avatica/DruidMeta.java | 53 +- .../io/druid/sql/avatica/DruidStatement.java | 12 +- .../ApproxCountDistinctSqlAggregator.java | 3 + .../calcite/aggregation/SqlAggregator.java | 3 + .../CharLengthExpressionConversion.java | 4 +- .../expression/ExpressionConversion.java | 15 +- .../expression/ExpressionConverter.java | 29 +- .../sql/calcite/expression/Expressions.java | 152 +++-- .../ExtractExpressionConversion.java | 19 +- .../sql/calcite/expression/ExtractionFns.java | 25 +- .../expression/FloorExpressionConversion.java | 38 +- .../SubstringExpressionConversion.java | 15 +- .../sql/calcite/expression/TimeUnits.java | 36 +- .../druid/sql/calcite/filtration/Bounds.java | 92 +++ .../druid/sql/calcite/planner/Calcites.java | 235 ++----- .../calcite/planner/DruidConvertletTable.java | 75 ++- .../sql/calcite/planner/DruidPlanner.java | 216 ++++++ .../sql/calcite/planner/PlannerConfig.java | 61 +- .../sql/calcite/planner/PlannerContext.java | 150 +++++ .../sql/calcite/planner/PlannerFactory.java | 18 +- .../io/druid/sql/calcite/planner/Rules.java | 36 +- .../sql/calcite/rel/DruidQueryBuilder.java | 4 +- .../druid/sql/calcite/rel/DruidQueryRel.java | 10 +- .../io/druid/sql/calcite/rel/DruidRel.java | 6 + .../io/druid/sql/calcite/rel/QueryMaker.java | 77 ++- .../sql/calcite/rule/DruidFilterRule.java | 1 + .../sql/calcite/rule/DruidSemiJoinRule.java | 10 +- .../sql/calcite/rule/DruidTableScanRule.java | 54 ++ .../druid/sql/calcite/rule/GroupByRules.java | 66 +- .../druid/sql/calcite/rule/SelectRules.java | 11 +- .../druid/sql/calcite/schema/DruidSchema.java | 4 - .../druid/sql/calcite/table/DruidTable.java | 15 +- .../main/java/io/druid/sql/http/SqlQuery.java | 28 +- .../java/io/druid/sql/http/SqlResource.java | 23 +- .../sql/avatica/DruidAvaticaHandlerTest.java | 43 +- .../druid/sql/avatica/DruidStatementTest.java | 8 +- .../druid/sql/calcite/CalciteQueryTest.java | 624 ++++++++++++++++-- .../sql/calcite/http/SqlResourceTest.java | 58 +- 44 files changed, 1884 insertions(+), 563 deletions(-) create mode 100644 sql/src/main/java/io/druid/sql/calcite/planner/DruidPlanner.java create mode 100644 sql/src/main/java/io/druid/sql/calcite/planner/PlannerContext.java create mode 100644 sql/src/main/java/io/druid/sql/calcite/rule/DruidTableScanRule.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java index 440cf6440cdc..9f1d6874648a 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java @@ -48,10 +48,10 @@ import io.druid.segment.column.ValueType; import io.druid.segment.serde.ComplexMetrics; import io.druid.sql.calcite.planner.Calcites; +import io.druid.sql.calcite.planner.DruidPlanner; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerFactory; import io.druid.sql.calcite.planner.PlannerResult; -import io.druid.sql.calcite.rel.QueryMaker; import io.druid.sql.calcite.table.DruidTable; import io.druid.sql.calcite.table.RowSignature; import io.druid.sql.calcite.util.CalciteTests; @@ -61,7 +61,6 @@ import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; -import org.apache.calcite.tools.Planner; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; import org.openjdk.jmh.annotations.Benchmark; @@ -157,7 +156,6 @@ public void setup() throws Exception final Map tableMap = ImmutableMap.of( "foo", new DruidTable( - new QueryMaker(walker, plannerConfig), new TableDataSource("foo"), RowSignature.builder() .add("__time", ValueType.LONG) @@ -177,6 +175,7 @@ protected Map getTableMap() }; plannerFactory = new PlannerFactory( Calcites.createRootSchema(druidSchema), + walker, CalciteTests.createOperatorTable(), plannerConfig ); @@ -233,8 +232,8 @@ public void queryNative(Blackhole blackhole) throws Exception @OutputTimeUnit(TimeUnit.MILLISECONDS) public void queryPlanner(Blackhole blackhole) throws Exception { - try (final Planner planner = plannerFactory.createPlanner()) { - final PlannerResult plannerResult = Calcites.plan(planner, sqlQuery); + try (final DruidPlanner planner = plannerFactory.createPlanner(null)) { + final PlannerResult plannerResult = planner.plan(sqlQuery); final ArrayList results = Sequences.toList(plannerResult.run(), Lists.newArrayList()); blackhole.consume(results); } diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 01059e1ea542..309152bc76fd 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -5,7 +5,7 @@ layout: doc_page Query Context ============= -The query context is used for various query configuration parameters. +The query context is used for various query configuration parameters. The following parameters apply to all queries. |property |default | description | |-----------------|---------------------|----------------------| @@ -17,8 +17,21 @@ The query context is used for various query configuration parameters. |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |chunkPeriod | `0` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries, reducing the impact on resources. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. All the query chunks will be processed asynchronously inside query processing executor service. Make sure "druid.processing.numThreads" is configured appropriately on the broker. | + +In addition, some query types offer context parameters specific to that query type. + +### TopN queries + +|property |default | description | +|-----------------|---------------------|----------------------| |minTopNThreshold | `1000` | The top minTopNThreshold local results from each segment are returned for merging to determine the global topN. | -|`maxResults`|500000|Maximum number of results groupBy query can process. Default value used can be changed by `druid.query.groupBy.maxResults` in druid configuration at broker and historical nodes. At query time you can only lower the value.| -|`maxIntermediateRows`|50000|Maximum number of intermediate rows while processing single segment for groupBy query. Default value used can be changed by `druid.query.groupBy.maxIntermediateRows` in druid configuration at broker and historical nodes. At query time you can only lower the value.| -|`groupByIsSingleThreaded`|false|Whether to run single threaded group By queries. Default value used can be changed by `druid.query.groupBy.singleThreaded` in druid configuration at historical nodes.| +### Timeseries queries + +|property |default | description | +|-----------------|---------------------|----------------------| +|skipEmptyBuckets | `false` | Disable timeseries zero-filling behavior, so only buckets with results will be returned. | + +### GroupBy queries + +See [GroupBy query context](groupbyquery.html#query-context). diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index b0fb6eeafb82..3eac4a68d960 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -31,14 +31,24 @@ jdbc:avatica:remote:url=http://BROKER:8082/druid/v2/sql/avatica/ Example code: ```java -Connection connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/"); -ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source"); -while (resultSet.next()) { - // Do something +// Connect to /druid/v2/sql/avatica/ on your broker. +String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/"; + +// Set any connection context parameters you need here (see "Connection context" below). +// Or leave empty for default behavior. +Properties connectionProperties = new Properties(); + +try (Connection connection = DriverManager.getConnection(url, connectionProperties)) { + try (ResultSet resultSet = connection.createStatement().executeQuery("SELECT COUNT(*) AS cnt FROM data_source")) { + while (resultSet.next()) { + // Do something + } + } } ``` -Table metadata is available over JDBC using `connection.getMetaData()`. +Table metadata is available over JDBC using `connection.getMetaData()` or by querying the "INFORMATION_SCHEMA" tables +(see below). Parameterized queries don't work properly, so avoid those. @@ -61,6 +71,17 @@ curl -XPOST -H'Content-Type: application/json' http://BROKER:8082/druid/v2/sql/ Metadata is only available over the HTTP API by querying the "INFORMATION_SCHEMA" tables (see below). +You can provide [connection context parameters](#connection-context) by adding a "context" map, like: + +```json +{ + "query" : "SELECT COUNT(*) FROM data_source WHERE foo = 'bar' AND __time > TIMESTAMP '2000-01-01 00:00:00'", + "context" : { + "sqlTimeZone" : "America/Los_Angeles" + } +} +``` + ### Metadata Druid brokers cache column type metadata for each dataSource and use it to plan SQL queries. This cache is updated @@ -77,7 +98,7 @@ SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE SCHEMA_NAME = 'druid' AND TABLE_N See the [INFORMATION_SCHEMA tables](#information_schema-tables) section below for details on the available metadata. -You can also access table and column metadata through JDBC using `connection.getMetaData()`. +You can access table and column metadata through JDBC using `connection.getMetaData()`. ### Approximate queries @@ -91,8 +112,8 @@ algorithm. - TopN-style queries with a single grouping column, like `SELECT col1, SUM(col2) FROM data_source GROUP BY col1 ORDER BY SUM(col2) DESC LIMIT 100`, by default will be executed as [TopN queries](topnquery.html), which use an approximate algorithm. To disable this behavior, and use exact -algorithms for topN-style queries, set -[druid.sql.planner.useApproximateTopN](../configuration/broker.html#sql-planner-configuration) to "false". +algorithms for topN-style queries, set "useApproximateTopN" to "false", either through query context or through broker +configuration. ### Time functions @@ -101,6 +122,10 @@ Druid's SQL language supports a number of time operations, including: - `FLOOR(__time TO )` for grouping or filtering on time buckets, like `SELECT FLOOR(__time TO MONTH), SUM(cnt) FROM data_source GROUP BY FLOOR(__time TO MONTH)` - `EXTRACT( FROM __time)` for grouping or filtering on time parts, like `SELECT EXTRACT(HOUR FROM __time), SUM(cnt) FROM data_source GROUP BY EXTRACT(HOUR FROM __time)` - Comparisons to `TIMESTAMP '