diff --git a/docs/assets/multi-stage-query/msq-ui-download-query-results.png b/docs/assets/multi-stage-query/msq-ui-download-query-results.png new file mode 100644 index 000000000000..e428cb2dfdeb Binary files /dev/null and b/docs/assets/multi-stage-query/msq-ui-download-query-results.png differ diff --git a/docs/assets/multi-stage-query/tutorial-msq-convert.png b/docs/assets/multi-stage-query/tutorial-msq-convert.png new file mode 100644 index 000000000000..f16941af67c3 Binary files /dev/null and b/docs/assets/multi-stage-query/tutorial-msq-convert.png differ diff --git a/docs/assets/multi-stage-query/ui-annotated.png b/docs/assets/multi-stage-query/ui-annotated.png new file mode 100644 index 000000000000..5a98c00d191c Binary files /dev/null and b/docs/assets/multi-stage-query/ui-annotated.png differ diff --git a/docs/assets/multi-stage-query/ui-empty.png b/docs/assets/multi-stage-query/ui-empty.png new file mode 100644 index 000000000000..7c30d5a671ac Binary files /dev/null and b/docs/assets/multi-stage-query/ui-empty.png differ diff --git a/docs/assets/tutorial-quickstart-02.png b/docs/assets/tutorial-quickstart-02.png new file mode 100644 index 000000000000..5edec67c3fd1 Binary files /dev/null and b/docs/assets/tutorial-quickstart-02.png differ diff --git a/docs/assets/tutorial-quickstart-03.png b/docs/assets/tutorial-quickstart-03.png new file mode 100644 index 000000000000..cff8fecfc191 Binary files /dev/null and b/docs/assets/tutorial-quickstart-03.png differ diff --git a/docs/assets/tutorial-quickstart-04.png b/docs/assets/tutorial-quickstart-04.png new file mode 100644 index 000000000000..431d8b97c8f5 Binary files /dev/null and b/docs/assets/tutorial-quickstart-04.png differ diff --git a/docs/assets/tutorial-quickstart-05.png b/docs/assets/tutorial-quickstart-05.png new file mode 100644 index 000000000000..6178289fdc8e Binary files /dev/null and b/docs/assets/tutorial-quickstart-05.png differ diff --git a/docs/development/extensions.md b/docs/development/extensions.md index 49f61c8d7467..7a8175fa55d2 100644 --- a/docs/development/extensions.md +++ b/docs/development/extensions.md @@ -50,6 +50,7 @@ Core extensions are maintained by Druid committers. |druid-kerberos|Kerberos authentication for druid processes.|[link](../development/extensions-core/druid-kerberos.md)| |druid-lookups-cached-global|A module for [lookups](../querying/lookups.md) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.md)| |druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.md)| +|druid-multi-stage-query| Support for the multi-stage query architecture for Apache Druid and the multi-stage query task engine.|[link](../multi-stage-query/index.md)| |druid-orc-extensions|Support for data in Apache ORC data format.|[link](../development/extensions-core/orc.md)| |druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-core/parquet.md)| |druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.md)| diff --git a/docs/multi-stage-query/index.md b/docs/multi-stage-query/index.md new file mode 100644 index 000000000000..b82408607f47 --- /dev/null +++ b/docs/multi-stage-query/index.md @@ -0,0 +1,342 @@ +--- +id: index +title: SQL-based ingestion overview and syntax +sidebar_label: Overview and syntax +description: Introduces multi-stage query architecture and its task engine +--- + + + +> SQL-based ingestion using the multi-stage query task engine is our recommended solution starting in Druid 24.0. Alternative ingestion solutions, such as native batch and Hadoop-based ingestion systems, will still be supported. We recommend you read all [known issues](./msq-known-issues.md) and test the feature in a development environment before rolling it out in production. Using the multi-stage query task engine with `SELECT` statements that do not write to a datasource is experimental. + +SQL-based ingestion for Apache Druid uses a distributed multi-stage query architecture, which includes a query engine called the multi-stage query task engine (MSQ task engine). The MSQ task engine extends Druid's query capabilities, so you can write queries that reference [external data](#read-external-data) as well as perform ingestion with SQL [INSERT](#insert-data) and [REPLACE](#replace-data). Essentially, you can perform SQL-based ingestion instead of using JSON ingestion specs that Druid's native ingestion uses. + +The MSQ task engine excels at executing queries that can get bottlenecked at the Broker when using Druid's native SQL engine. When you submit queries, the MSQ task engine splits them into stages and automatically exchanges data between stages. Each stage is parallelized to run across multiple data servers at once, simplifying performance. + + +## MSQ task engine features + +In its current state, the MSQ task engine enables you to do the following: + +- Read external data at query time using EXTERN. +- Execute batch ingestion jobs by writing SQL queries using INSERT and REPLACE. You no longer need to generate a JSON-based ingestion spec. +- Transform and rewrite existing tables using SQL. +- Perform multi-dimension range partitioning reliably, which leads to more evenly distributed segment sizes and better performance. + +The MSQ task engine has additional features that can be used as part of a proof of concept or demo, but don't use or rely on the following features for any meaningful use cases, especially production ones: + +- Execute heavy-weight queries and return large numbers of rows. +- Execute queries that exchange large amounts of data between servers, like exact count distinct of high-cardinality fields. + + +## Load the extension + +For new clusters that use 24.0 or later, the multi-stage query extension is loaded by default. If you want to add the extension to an existing cluster, add the extension `druid-multi-stage-query` to `druid.extensions.loadlist` in your `common.runtime.properties` file. + +For more information about how to load an extension, see [Loading extensions](../development/extensions.md#loading-extensions). + +To use EXTERN, you need READ permission on the resource named "EXTERNAL" of the resource type "EXTERNAL". If you encounter a 403 error when trying to use EXTERN, verify that you have the correct permissions. + +## MSQ task engine query syntax + +You can submit queries to the MSQ task engine through the **Query** view in the Druid console or through the API. The Druid console is a good place to start because you can preview a query before you run it. You can also experiment with many of the [context parameters](./msq-reference.md#context-parameters) through the UI. Once you're comfortable with submitting queries through the Druid console, [explore using the API to submit a query](./msq-api.md#submit-a-query). + +If you encounter an issue after you submit a query, you can learn more about what an error means from the [limits](./msq-concepts.md#limits) and [errors](./msq-concepts.md#error-codes). + +Queries for the MSQ task engine involve three primary functions: + +- EXTERN to query external data +- INSERT INTO ... SELECT to insert data, such as data from an external source +- REPLACE to replace existing datasources, partially or fully, with query results + +For information about the syntax for queries, see [SQL syntax](./msq-reference.md#sql-syntax). + +### Read external data + +Query tasks can access external data through the EXTERN function. When using EXTERN, keep in mind that large files do not get split across different worker tasks. If you have fewer input files than worker tasks, you can increase query parallelism by splitting up your input files such that you have at least one input file per worker task. + +You can use the EXTERN function anywhere a table is expected in the following form: `TABLE(EXTERN(...))`. You can use external data with SELECT, INSERT, and REPLACE queries. + +The following query reads external data: + +```sql +SELECT + * +FROM TABLE( + EXTERN( + '{"type": "http", "uris": ["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type": "json"}', + '[{"name": "timestamp", "type": "string"}, {"name": "page", "type": "string"}, {"name": "user", "type": "string"}]' + ) +) +LIMIT 100 +``` + +For more information about the syntax, see [EXTERN](./msq-reference.md#extern). + +### Insert data + +With the MSQ task engine, Druid can use the results of a query task to create a new datasource or to append to an existing datasource. Syntactically, there is no difference between the two. These operations use the INSERT INTO ... SELECT syntax. + +All SELECT capabilities are available for INSERT queries. However, the MSQ task engine does not include all the existing SQL query features of Druid. See [Known issues](./msq-known-issues.md) for a list of capabilities that aren't available. + +The following example query inserts data from an external source into a table named `w000` and partitions it by day: + +```sql +INSERT INTO w000 +SELECT + TIME_PARSE("timestamp") AS __time, + "page", + "user" +FROM TABLE( + EXTERN( + '{"type": "http", "uris": ["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type": "json"}', + '[{"name": "timestamp", "type": "string"}, {"name": "page", "type": "string"}, {"name": "user", "type": "string"}]' + ) +) +PARTITIONED BY DAY +``` + +For more information about the syntax, see [INSERT](./msq-reference.md#insert). + +### Replace data + +The syntax for REPLACE is similar to INSERT. All SELECT functionality is available for REPLACE queries. +Note that the MSQ task engine does not yet implement all native Druid query features. +For details, see [Known issues](./msq-known-issues.md). + +When working with REPLACE queries, keep the following in mind: + +- The intervals generated as a result of the OVERWRITE WHERE query must align with the granularity specified in the PARTITIONED BY clause. +- OVERWRITE WHERE queries only support the `__time` column. + +For more information about the syntax, see [REPLACE](./msq-reference.md#replace). + +The following examples show how to replace data in a table. + +#### REPLACE all data + +You can replace all the data in a table by using REPLACE INTO ... OVERWRITE ALL SELECT: + +```sql +REPLACE INTO w000 +OVERWRITE ALL +SELECT + TIME_PARSE("timestamp") AS __time, + "page", + "user" +FROM TABLE( + EXTERN( + '{"type": "http", "uris": ["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type": "json"}', + '[{"name": "timestamp", "type": "string"}, {"name": "page", "type": "string"}, {"name": "user", "type": "string"}]' + ) +) +PARTITIONED BY DAY +``` + +#### REPLACE some data + +You can replace some of the data in a table by using REPLACE INTO ... OVERWRITE WHERE ... SELECT: + +```sql +REPLACE INTO w000 +OVERWRITE WHERE __time >= TIMESTAMP '2019-08-25' AND __time < TIMESTAMP '2019-08-28' +SELECT + TIME_PARSE("timestamp") AS __time, + "page", + "user" +FROM TABLE( + EXTERN( + '{"type": "http", "uris": ["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type": "json"}', + '[{"name": "timestamp", "type": "string"}, {"name": "page", "type": "string"}, {"name": "user", "type": "string"}]' + ) +) +PARTITIONED BY DAY +``` + +## Adjust query behavior + +In addition to the basic functions, you can further modify your query behavior to control how your queries run or what your results look like. You can control how your queries behave by changing the following: + +### Primary timestamp + +Druid tables always include a primary timestamp named `__time`, so your ingestion query should generally include a column named `__time`. + +The following formats are supported for `__time` in the source data: +- ISO 8601 with 'T' separator, such as "2000-01-01T01:02:03.456" +- Milliseconds since Unix epoch (00:00:00 UTC on January 1, 1970) + +The `__time` column is used for time-based partitioning, such as `PARTITIONED BY DAY`. + +If you use `PARTITIONED BY ALL` or `PARTITIONED BY ALL TIME`, time-based +partitioning is disabled. In these cases, your ingestion query doesn't need +to include a `__time` column. However, Druid still creates a `__time` column +in your Druid table and sets all timestamps to 1970-01-01 00:00:00. + +For more information, see [Primary timestamp](../ingestion/data-model.md#primary-timestamp). + +### PARTITIONED BY + +INSERT and REPLACE queries require the PARTITIONED BY clause, which determines how time-based partitioning is done. In Druid, data is split into segments, one or more per time chunk defined by the PARTITIONED BY granularity. A good general rule is to adjust the granularity so that each segment contains about five million rows. Choose a granularity based on your ingestion rate. For example, if you ingest a million rows per day, PARTITION BY DAY is good. If you ingest a million rows an hour, choose PARTITION BY HOUR instead. + +Using the clause provides the following benefits: + +- Better query performance due to time-based segment pruning, which removes segments from + consideration when they do not contain any data for a query's time filter. +- More efficient data management, as data can be rewritten for each time partition individually + rather than the whole table. + +You can use the following arguments for PARTITIONED BY: + +- Time unit: `HOUR`, `DAY`, `MONTH`, or `YEAR`. Equivalent to `FLOOR(__time TO TimeUnit)`. +- `TIME_FLOOR(__time, 'granularity_string')`, where granularity_string is an ISO 8601 period like + 'PT1H'. The first argument must be `__time`. +- `FLOOR(__time TO TimeUnit)`, where `TimeUnit` is any unit supported by the [FLOOR function](../querying/sql-scalar.md#date-and-time-functions). The first + argument must be `__time`. +- `ALL` or `ALL TIME`, which effectively disables time partitioning by placing all data in a single + time chunk. To use LIMIT or OFFSET at the outer level of your INSERT or REPLACE query, you must set PARTITIONED BY to ALL or ALL TIME. + +You can use the following ISO 8601 periods for `TIME_FLOOR`: + +- PT1S +- PT1M +- PT5M +- PT10M +- PT15M +- PT30M +- PT1H +- PT6H +- P1D +- P1W +- P1M +- P3M +- P1Y + + +### CLUSTERED BY + +Data is first divided by the PARTITION BY clause. Data can be further split by the CLUSTERED BY clause. For example, suppose you ingest 100 M rows per hour and use `PARTITIONED BY HOUR` as your time partition. You then divide up the data further by adding `CLUSTERED BY hostName`. The result is segments of about 5 million rows, with like `hostNames` grouped within the same segment. + +Using CLUSTERED BY has the following benefits: + +- Lower storage footprint due to combining similar data into the same segments, which improves + compressibility. +- Better query performance due to dimension-based segment pruning, which removes segments from + consideration when they cannot possibly contain data matching a query's filter. + +For dimension-based segment pruning to be effective, your queries should meet the following conditions: + +- All CLUSTERED BY columns are single-valued string columns +- Use a REPLACE query for ingestion + +Druid still clusters data during ingestion if these conditions aren't met but won't perform dimension-based segment pruning at query time. That means if you use an INSERT query for ingestion or have numeric columns or multi-valued string columns, dimension-based segment pruning doesn't occur at query time. + +You can tell if dimension-based segment pruning is possible by using the `sys.segments` table to +inspect the `shard_spec` for the segments generated by an ingestion query. If they are of type +`range` or `single`, then dimension-based segment pruning is possible. Otherwise, it is not. The +shard spec type is also available in the **Segments** view under the **Partitioning** +column. + +You can use the following filters for dimension-based segment pruning: + +- Equality to string literals, like `x = 'foo'` or `x IN ('foo', 'bar')`. +- Comparison to string literals, like `x < 'foo'` or other comparisons involving `<`, `>`, `<=`, or `>=`. + +This differs from multi-dimension range based partitioning in classic batch ingestion where both +string and numeric columns support Broker-level pruning. With SQL-based batch ingestion, +only string columns support Broker-level pruning. + +It is okay to mix time partitioning with secondary partitioning. For example, you can +combine `PARTITIONED BY HOUR` with `CLUSTERED BY channel` to perform +time partitioning by hour and secondary partitioning by channel within each hour. + +### GROUP BY + +A query's GROUP BY clause determines how data is rolled up. The expressions in the GROUP BY clause become +dimensions, and aggregation functions become metrics. + +### Ingest-time aggregations + +When performing rollup using aggregations, it is important to use aggregators +that return nonfinalized state. This allows you to perform further rollups +at query time. To achieve this, set `finalizeAggregations: false` in your +ingestion query context. + +Check out the [INSERT with rollup example query](./msq-example-queries.md#insert-with-rollup) to see this feature in +action. + +Druid needs information for aggregating measures of different segments to compact. For example, to aggregate `count("col") as example_measure`, Druid needs to sum the value of `example_measure` +across the segments. This information is stored inside the metadata of the segment. For the SQL-based ingestion, Druid only populates the +aggregator information of a column in the segment metadata when: + +- The INSERT or REPLACE query has an outer GROUP BY clause. +- The following context parameters are set for the query context: `finalizeAggregations: false` and `groupByEnableMultiValueUnnesting: false` + +The following table lists query-time aggregations for SQL-based ingestion: + +|Query-time aggregation|Notes| +|----------------------|-----| +|SUM|Use unchanged at ingest time.| +|MIN|Use unchanged at ingest time.| +|MAX|Use unchanged at ingest time.| +|AVG|Use SUM and COUNT at ingest time. Switch to quotient of SUM at query time.| +|COUNT|Use unchanged at ingest time, but switch to SUM at query time.| +|COUNT(DISTINCT expr)|If approximate, use APPROX_COUNT_DISTINCT at ingest time.

If exact, you cannot use an ingest-time aggregation. Instead, `expr` must be stored as-is. Add it to the SELECT and GROUP BY lists.| +|EARLIEST(expr)

(numeric form)|Not supported.| +|EARLIEST(expr, maxBytes)

(string form)|Use unchanged at ingest time.| +|LATEST(expr)

(numeric form)|Not supported.| +|LATEST(expr, maxBytes)

(string form)|Use unchanged at ingest time.| +|APPROX_COUNT_DISTINCT|Use unchanged at ingest time.| +|APPROX_COUNT_DISTINCT_BUILTIN|Use unchanged at ingest time.| +|APPROX_COUNT_DISTINCT_DS_HLL|Use unchanged at ingest time.| +|APPROX_COUNT_DISTINCT_DS_THETA|Use unchanged at ingest time.| +|APPROX_QUANTILE|Not supported. Deprecated; use APPROX_QUANTILE_DS instead.| +|APPROX_QUANTILE_DS|Use DS_QUANTILES_SKETCH at ingest time. Continue using APPROX_QUANTILE_DS at query time.| +|APPROX_QUANTILE_FIXED_BUCKETS|Not supported.| + +### Multi-value dimensions + +By default, multi-value dimensions are not ingested as expected when rollup is enabled because the +GROUP BY operator unnests them instead of leaving them as arrays. This is [standard behavior](../querying/sql-data-types.md#multi-value-strings) for GROUP BY but it is generally not desirable behavior for ingestion. + +To address this: + +- When using GROUP BY with data from EXTERN, wrap any string type fields from EXTERN that may be + multi-valued in `MV_TO_ARRAY`. +- Set `groupByEnableMultiValueUnnesting: false` in your query context to ensure that all multi-value + strings are properly converted to arrays using `MV_TO_ARRAY`. If any strings aren't + wrapped in `MV_TO_ARRAY`, the query reports an error that includes the message "Encountered + multi-value dimension x that cannot be processed with groupByEnableMultiValueUnnesting set to false." + +For an example, see [INSERT with rollup example query](./msq-example-queries.md#insert-with-rollup). + +### Context parameters + +Context parameters can control things such as how many tasks get launched or what happens if there's a malformed record. + +For a full list of context parameters and how they affect a query, see [Context parameters](./msq-reference.md#context-parameters). + +## Next steps + +* [Understand how the multi-stage query architecture works](./msq-concepts.md) by reading about the concepts behind it and its processes. +* [Explore the Query view](../operations/druid-console.md) to learn about the UI tools that can help you get started. \ No newline at end of file diff --git a/docs/multi-stage-query/msq-api.md b/docs/multi-stage-query/msq-api.md new file mode 100644 index 000000000000..b9c2f165acee --- /dev/null +++ b/docs/multi-stage-query/msq-api.md @@ -0,0 +1,1652 @@ +--- +id: api +title: SQL-based ingestion APIs +sidebar_label: API +--- + + + +> SQL-based ingestion using the multi-stage query task engine is our recommended solution starting in Druid 24.0. Alternative ingestion solutions, such as native batch and Hadoop-based ingestion systems, will still be supported. We recommend you read all [known issues](./msq-known-issues.md) and test the feature in a development environment before rolling it out in production. Using the multi-stage query task engine with `SELECT` statements that do not write to a datasource is experimental. + +The **Query** view in the Druid console provides the most stable experience for the multi-stage query task engine (MSQ task engine) and multi-stage query architecture. Use the UI if you do not need a programmatic interface. + +When using the API for the MSQ task engine, the action you want to take determines the endpoint you use: + +- `/druid/v2/sql/task` endpoint: Submit a query for ingestion. +- `/druid/indexer/v1/task` endpoint: Interact with a query, including getting its status, getting its details, or canceling it. This page describes a few of the Overlord Task APIs that you can use with the MSQ task engine. For information about Druid APIs, see the [API reference for Druid](../operations/api-reference.md#tasks). + +## Submit a query + +You submit queries to the MSQ task engine using the `POST /druid/v2/sql/task/` endpoint. + +### Request + +Currently, the MSQ task engine ignores the provided values of `resultFormat`, `header`, +`typesHeader`, and `sqlTypesHeader`. SQL SELECT queries write out their results into the task report (in the `multiStageQuery.payload.results.results` key) formatted as if `resultFormat` is an `array`. + +For task queries similar to the [example queries](./msq-example-queries.md), you need to escape characters such as quotation marks (") if you use something like `curl`. +You don't need to escape characters if you use a method that can parse JSON seamlessly, such as Python. +The Python example in this topic escapes quotation marks although it's not required. + +The following example is the same query that you submit when you complete [Convert a JSON ingestion spec](./msq-tutorial-convert-ingest-spec.md) where you insert data into a table named `wikipedia`. + + + + + +``` +POST /druid/v2/sql/task +``` + +```json +{ + "query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://static.imply.io/data/wikipedia.json.gz\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY", + "context": { + "maxNumTasks": 3 + } +} +``` + + + +Make sure you replace `username`, `password`, `your-instance`, and `port` with the values for your deployment. + +```bash +curl --location --request POST 'https://:@:/druid/v2/sql/task/' \ +--header 'Content-Type: application/json' \ +--data-raw '{ + "query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '\''{\"type\": \"http\", \"uris\": [\"https://static.imply.io/data/wikipedia.json.gz\"]}'\'',\n '\''{\"type\": \"json\"}'\'',\n '\''[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\''\n )\n)\nPARTITIONED BY DAY", + "context": { + "maxNumTasks": 3 + } +``` + + +Make sure you replace `username`, `password`, `your-instance`, and `port` with the values for your deployment. + +```python +import json +import requests + +url = "https://:@:/druid/v2/sql/task/" + +payload = json.dumps({ + "query": "INSERT INTO wikipedia\nSELECT\n TIME_PARSE(\"timestamp\") AS __time,\n *\nFROM TABLE(\n EXTERN(\n '{\"type\": \"http\", \"uris\": [\"https://static.imply.io/data/wikipedia.json.gz\"]}',\n '{\"type\": \"json\"}',\n '[{\"name\": \"added\", \"type\": \"long\"}, {\"name\": \"channel\", \"type\": \"string\"}, {\"name\": \"cityName\", \"type\": \"string\"}, {\"name\": \"comment\", \"type\": \"string\"}, {\"name\": \"commentLength\", \"type\": \"long\"}, {\"name\": \"countryIsoCode\", \"type\": \"string\"}, {\"name\": \"countryName\", \"type\": \"string\"}, {\"name\": \"deleted\", \"type\": \"long\"}, {\"name\": \"delta\", \"type\": \"long\"}, {\"name\": \"deltaBucket\", \"type\": \"string\"}, {\"name\": \"diffUrl\", \"type\": \"string\"}, {\"name\": \"flags\", \"type\": \"string\"}, {\"name\": \"isAnonymous\", \"type\": \"string\"}, {\"name\": \"isMinor\", \"type\": \"string\"}, {\"name\": \"isNew\", \"type\": \"string\"}, {\"name\": \"isRobot\", \"type\": \"string\"}, {\"name\": \"isUnpatrolled\", \"type\": \"string\"}, {\"name\": \"metroCode\", \"type\": \"string\"}, {\"name\": \"namespace\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"regionIsoCode\", \"type\": \"string\"}, {\"name\": \"regionName\", \"type\": \"string\"}, {\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n )\n)\nPARTITIONED BY DAY", + "context": { + "maxNumTasks": 3 + } +}) +headers = { + 'Content-Type': 'application/json' +} + +response = requests.request("POST", url, headers=headers, data=payload) + +print(response.text) + +``` + + + + +### Response + +```json +{ + "taskId": "query-f795a235-4dc7-4fef-abac-3ae3f9686b79", + "state": "RUNNING", +} +``` + +**Response fields** + +|Field|Description| +|-----|-----------| +| taskId | Controller task ID. You can use Druid's standard [task APIs](../operations/api-reference.md#overlord) to interact with this controller task.| +| state | Initial state for the query, which is "RUNNING".| + + +## Get the payload for a query task + +You can retrieve basic information about a query task, such as the SQL query and context parameters that were submitted. + +### Request + + + + + +``` +GET /druid/indexer/v1/task/ +``` + + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```bash +curl --location --request GET 'https://:@:/druid/indexer/v1/task/' +``` + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```python +import requests + +url = ":@:/druid/indexer/v1/task/" + +payload={} +headers = {} + +response = requests.request("GET", url, headers=headers, data=payload) + +print(response.text) + +``` + + + +### Response + +
Show the response + +``` +{ + "task": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "payload": { + "type": "query_controller", + "id": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "spec": { + "query": { + "queryType": "scan", + "dataSource": { + "type": "external", + "inputSource": { + "type": "http", + "uris": [ + "https://static.imply.io/data/kttm/kttm-v2-2019-08-25.json.gz" + ], + "httpAuthenticationUsername": null, + "httpAuthenticationPassword": null + }, + "inputFormat": { + "type": "json", + "flattenSpec": null, + "featureSpec": {}, + "keepNullColumns": false + }, + "signature": [ + { + "name": "timestamp", + "type": "STRING" + }, + { + "name": "agent_category", + "type": "STRING" + }, + { + "name": "agent_type", + "type": "STRING" + }, + { + "name": "browser", + "type": "STRING" + }, + { + "name": "browser_version", + "type": "STRING" + }, + { + "name": "city", + "type": "STRING" + }, + { + "name": "continent", + "type": "STRING" + }, + { + "name": "country", + "type": "STRING" + }, + { + "name": "version", + "type": "STRING" + }, + { + "name": "event_type", + "type": "STRING" + }, + { + "name": "event_subtype", + "type": "STRING" + }, + { + "name": "loaded_image", + "type": "STRING" + }, + { + "name": "adblock_list", + "type": "STRING" + }, + { + "name": "forwarded_for", + "type": "STRING" + }, + { + "name": "language", + "type": "STRING" + }, + { + "name": "number", + "type": "LONG" + }, + { + "name": "os", + "type": "STRING" + }, + { + "name": "path", + "type": "STRING" + }, + { + "name": "platform", + "type": "STRING" + }, + { + "name": "referrer", + "type": "STRING" + }, + { + "name": "referrer_host", + "type": "STRING" + }, + { + "name": "region", + "type": "STRING" + }, + { + "name": "remote_address", + "type": "STRING" + }, + { + "name": "screen", + "type": "STRING" + }, + { + "name": "session", + "type": "STRING" + }, + { + "name": "session_length", + "type": "LONG" + }, + { + "name": "timezone", + "type": "STRING" + }, + { + "name": "timezone_offset", + "type": "LONG" + }, + { + "name": "window", + "type": "STRING" + } + ] + }, + "intervals": { + "type": "intervals", + "intervals": [ + "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" + ] + }, + "resultFormat": "compactedList", + "columns": [ + "adblock_list", + "agent_category", + "agent_type", + "browser", + "browser_version", + "city", + "continent", + "country", + "event_subtype", + "event_type", + "forwarded_for", + "language", + "loaded_image", + "number", + "os", + "path", + "platform", + "referrer", + "referrer_host", + "region", + "remote_address", + "screen", + "session", + "session_length", + "timestamp", + "timezone", + "timezone_offset", + "version", + "window" + ], + "legacy": false, + "context": { + "finalize": true, + "msqMaxNumTasks": 3, + "msqSignature": "[{\"name\":\"adblock_list\",\"type\":\"STRING\"},{\"name\":\"agent_category\",\"type\":\"STRING\"},{\"name\":\"agent_type\",\"type\":\"STRING\"},{\"name\":\"browser\",\"type\":\"STRING\"},{\"name\":\"browser_version\",\"type\":\"STRING\"},{\"name\":\"city\",\"type\":\"STRING\"},{\"name\":\"continent\",\"type\":\"STRING\"},{\"name\":\"country\",\"type\":\"STRING\"},{\"name\":\"event_subtype\",\"type\":\"STRING\"},{\"name\":\"event_type\",\"type\":\"STRING\"},{\"name\":\"forwarded_for\",\"type\":\"STRING\"},{\"name\":\"language\",\"type\":\"STRING\"},{\"name\":\"loaded_image\",\"type\":\"STRING\"},{\"name\":\"number\",\"type\":\"LONG\"},{\"name\":\"os\",\"type\":\"STRING\"},{\"name\":\"path\",\"type\":\"STRING\"},{\"name\":\"platform\",\"type\":\"STRING\"},{\"name\":\"referrer\",\"type\":\"STRING\"},{\"name\":\"referrer_host\",\"type\":\"STRING\"},{\"name\":\"region\",\"type\":\"STRING\"},{\"name\":\"remote_address\",\"type\":\"STRING\"},{\"name\":\"screen\",\"type\":\"STRING\"},{\"name\":\"session\",\"type\":\"STRING\"},{\"name\":\"session_length\",\"type\":\"LONG\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"timezone\",\"type\":\"STRING\"},{\"name\":\"timezone_offset\",\"type\":\"LONG\"},{\"name\":\"version\",\"type\":\"STRING\"},{\"name\":\"window\",\"type\":\"STRING\"}]", + "multiStageQuery": true, + "sqlInsertSegmentGranularity": "{\"type\":\"all\"}", + "sqlQueryId": "a6b65442-f77e-44e4-af28-ab3b711a27ac", + "sqlReplaceTimeChunks": "all" + }, + "granularity": { + "type": "all" + } + }, + "columnMappings": [ + { + "queryColumn": "timestamp", + "outputColumn": "timestamp" + }, + { + "queryColumn": "agent_category", + "outputColumn": "agent_category" + }, + { + "queryColumn": "agent_type", + "outputColumn": "agent_type" + }, + { + "queryColumn": "browser", + "outputColumn": "browser" + }, + { + "queryColumn": "browser_version", + "outputColumn": "browser_version" + }, + { + "queryColumn": "city", + "outputColumn": "city" + }, + { + "queryColumn": "continent", + "outputColumn": "continent" + }, + { + "queryColumn": "country", + "outputColumn": "country" + }, + { + "queryColumn": "version", + "outputColumn": "version" + }, + { + "queryColumn": "event_type", + "outputColumn": "event_type" + }, + { + "queryColumn": "event_subtype", + "outputColumn": "event_subtype" + }, + { + "queryColumn": "loaded_image", + "outputColumn": "loaded_image" + }, + { + "queryColumn": "adblock_list", + "outputColumn": "adblock_list" + }, + { + "queryColumn": "forwarded_for", + "outputColumn": "forwarded_for" + }, + { + "queryColumn": "language", + "outputColumn": "language" + }, + { + "queryColumn": "number", + "outputColumn": "number" + }, + { + "queryColumn": "os", + "outputColumn": "os" + }, + { + "queryColumn": "path", + "outputColumn": "path" + }, + { + "queryColumn": "platform", + "outputColumn": "platform" + }, + { + "queryColumn": "referrer", + "outputColumn": "referrer" + }, + { + "queryColumn": "referrer_host", + "outputColumn": "referrer_host" + }, + { + "queryColumn": "region", + "outputColumn": "region" + }, + { + "queryColumn": "remote_address", + "outputColumn": "remote_address" + }, + { + "queryColumn": "screen", + "outputColumn": "screen" + }, + { + "queryColumn": "session", + "outputColumn": "session" + }, + { + "queryColumn": "session_length", + "outputColumn": "session_length" + }, + { + "queryColumn": "timezone", + "outputColumn": "timezone" + }, + { + "queryColumn": "timezone_offset", + "outputColumn": "timezone_offset" + }, + { + "queryColumn": "window", + "outputColumn": "window" + } + ], + "destination": { + "type": "dataSource", + "dataSource": "kttm_simple", + "segmentGranularity": { + "type": "all" + }, + "replaceTimeChunks": [ + "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" + ] + }, + "assignmentStrategy": "max", + "tuningConfig": { + "type": "index_parallel", + "maxRowsPerSegment": 3000000, + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 100000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxTotalRows": null, + "numShards": null, + "splitHintSpec": null, + "partitionsSpec": { + "type": "dynamic", + "maxRowsPerSegment": 3000000, + "maxTotalRows": null + }, + "indexSpec": { + "bitmap": { + "type": "roaring", + "compressRunOnSerialization": true + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs", + "segmentLoader": null + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring", + "compressRunOnSerialization": true + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs", + "segmentLoader": null + }, + "maxPendingPersists": 0, + "forceGuaranteedRollup": false, + "reportParseExceptions": false, + "pushTimeout": 0, + "segmentWriteOutMediumFactory": null, + "maxNumConcurrentSubTasks": 2, + "maxRetry": 1, + "taskStatusCheckPeriodMs": 1000, + "chatHandlerTimeout": "PT10S", + "chatHandlerNumRetries": 5, + "maxNumSegmentsToMerge": 100, + "totalNumMergeTasks": 10, + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "maxColumnsToMerge": -1, + "awaitSegmentAvailabilityTimeoutMillis": 0, + "maxAllowedLockCount": -1, + "partitionDimensions": [] + } + }, + "sqlQuery": "REPLACE INTO \"kttm_simple\" OVERWRITE ALL\nSELECT *\nFROM TABLE(\n EXTERN(\n '{\"type\":\"http\",\"uris\":[\"https://static.imply.io/data/kttm/kttm-v2-2019-08-25.json.gz\"]}',\n '{\"type\":\"json\"}',\n '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"agent_category\",\"type\":\"string\"},{\"name\":\"agent_type\",\"type\":\"string\"},{\"name\":\"browser\",\"type\":\"string\"},{\"name\":\"browser_version\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"event_type\",\"type\":\"string\"},{\"name\":\"event_subtype\",\"type\":\"string\"},{\"name\":\"loaded_image\",\"type\":\"string\"},{\"name\":\"adblock_list\",\"type\":\"string\"},{\"name\":\"forwarded_for\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"os\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"},{\"name\":\"platform\",\"type\":\"string\"},{\"name\":\"referrer\",\"type\":\"string\"},{\"name\":\"referrer_host\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"remote_address\",\"type\":\"string\"},{\"name\":\"screen\",\"type\":\"string\"},{\"name\":\"session\",\"type\":\"string\"},{\"name\":\"session_length\",\"type\":\"long\"},{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"timezone_offset\",\"type\":\"long\"},{\"name\":\"window\",\"type\":\"string\"}]'\n )\n)\nPARTITIONED BY ALL TIME", + "sqlQueryContext": { + "parseExceptions": 0, + "maxNumTasks": 3, + "signature": "[{\"name\":\"adblock_list\",\"type\":\"STRING\"},{\"name\":\"agent_category\",\"type\":\"STRING\"},{\"name\":\"agent_type\",\"type\":\"STRING\"},{\"name\":\"browser\",\"type\":\"STRING\"},{\"name\":\"browser_version\",\"type\":\"STRING\"},{\"name\":\"city\",\"type\":\"STRING\"},{\"name\":\"continent\",\"type\":\"STRING\"},{\"name\":\"country\",\"type\":\"STRING\"},{\"name\":\"event_subtype\",\"type\":\"STRING\"},{\"name\":\"event_type\",\"type\":\"STRING\"},{\"name\":\"forwarded_for\",\"type\":\"STRING\"},{\"name\":\"language\",\"type\":\"STRING\"},{\"name\":\"loaded_image\",\"type\":\"STRING\"},{\"name\":\"number\",\"type\":\"LONG\"},{\"name\":\"os\",\"type\":\"STRING\"},{\"name\":\"path\",\"type\":\"STRING\"},{\"name\":\"platform\",\"type\":\"STRING\"},{\"name\":\"referrer\",\"type\":\"STRING\"},{\"name\":\"referrer_host\",\"type\":\"STRING\"},{\"name\":\"region\",\"type\":\"STRING\"},{\"name\":\"remote_address\",\"type\":\"STRING\"},{\"name\":\"screen\",\"type\":\"STRING\"},{\"name\":\"session\",\"type\":\"STRING\"},{\"name\":\"session_length\",\"type\":\"LONG\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"timezone\",\"type\":\"STRING\"},{\"name\":\"timezone_offset\",\"type\":\"LONG\"},{\"name\":\"version\",\"type\":\"STRING\"},{\"name\":\"window\",\"type\":\"STRING\"}]", + "multiStageQuery": true, + "sqlInsertSegmentGranularity": "{\"type\":\"all\"}", + "sqlQueryId": "a6b65442-f77e-44e4-af28-ab3b711a27ac", + "sqlReplaceTimeChunks": "all" + }, + "sqlTypeNames": [ + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "BIGINT", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "VARCHAR", + "BIGINT", + "VARCHAR", + "BIGINT", + "VARCHAR" + ], + "context": { + "forceTimeChunkLock": true, + "useLineageBasedSegmentAllocation": true + }, + "groupId": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "dataSource": "kttm_simple", + "resource": { + "availabilityGroup": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "requiredCapacity": 1 + } + } +} +``` + +
+ +## Get the status for a query task + +You can retrieve status of a query to see if it is still running, completed successfully, failed, or got canceled. + +### Request + + + + + +``` +GET /druid/indexer/v1/task/ +``` + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```bash +curl --location --request GET 'https://:@:/druid/indexer/v1/task//status' +``` + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```python +import requests + +url = "https://:@:/druid/indexer/v1/task//status" + +payload={} +headers = {} + +response = requests.request("GET", url, headers=headers, data=payload) + +print(response.text) +``` + + + +### Response + +``` +{ + "task": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "status": { + "id": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "groupId": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "type": "query_controller", + "createdTime": "2022-07-27T20:09:23.551Z", + "queueInsertionTime": "1970-01-01T00:00:00.000Z", + "statusCode": "SUCCESS", + "status": "SUCCESS", + "runnerStatusCode": "WAITING", + "duration": 136636, + "location": { + "host": "ip-10-201-5-81.ec2.internal", + "port": -1, + "tlsPort": 8100 + }, + "dataSource": "kttm_simple", + "errorMsg": null + } +} +``` + +## Get the report for a query task + +A report provides detailed information about a query task, including things like the stages, warnings, and errors. + +Keep the following in mind when using the task API to view reports: + +- For SELECT queries, the report includes the results. At this time, if you want to view results for SELECT queries, you need to retrieve them as a generic map from the report and extract the results. +- The task report stores query details for controller tasks. +- If you encounter `500 Server Error` or `404 Not Found` errors, the task may be in the process of starting up or shutting down. + +For an explanation of the fields in a report, see [Report response fields](#report-response-fields). + +### Request + + + + + +``` +GET /druid/indexer/v1/task//report +``` + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```bash +curl --location --request GET 'https://:@:/druid/indexer/v1/task//report' +``` + + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```python +import requests + +url = "https://:@:/druid/indexer/v1/task//reports" + +payload={} +headers = {} + +response = requests.request("GET", url, headers=headers, data=payload) + +print(response.text) +``` + + + + +### Response + +The response shows an example report for a query. + +
Show the response + +```json +{ + "multiStageQuery": { + "taskId": "query-a6b65442-f77e-44e4-af28-ab3b711a27ac", + "payload": { + "status": { + "status": "SUCCESS", + "startTime": "2022-07-27T20:09:39.915Z", + "durationMs": 116516, + "warningReports": [] + }, + "stages": [ + { + "stageNumber": 0, + "definition": { + "id": "f224410f-1cad-4ee7-b10d-f10ddf8bb517_0", + "input": [ + { + "type": "external", + "inputSource": { + "type": "http", + "uris": [ + "https://static.imply.io/data/kttm/kttm-v2-2019-08-25.json.gz" + ], + "httpAuthenticationUsername": null, + "httpAuthenticationPassword": null + }, + "inputFormat": { + "type": "json", + "flattenSpec": null, + "featureSpec": {}, + "keepNullColumns": false + }, + "signature": [ + { + "name": "timestamp", + "type": "STRING" + }, + { + "name": "agent_category", + "type": "STRING" + }, + { + "name": "agent_type", + "type": "STRING" + }, + { + "name": "browser", + "type": "STRING" + }, + { + "name": "browser_version", + "type": "STRING" + }, + { + "name": "city", + "type": "STRING" + }, + { + "name": "continent", + "type": "STRING" + }, + { + "name": "country", + "type": "STRING" + }, + { + "name": "version", + "type": "STRING" + }, + { + "name": "event_type", + "type": "STRING" + }, + { + "name": "event_subtype", + "type": "STRING" + }, + { + "name": "loaded_image", + "type": "STRING" + }, + { + "name": "adblock_list", + "type": "STRING" + }, + { + "name": "forwarded_for", + "type": "STRING" + }, + { + "name": "language", + "type": "STRING" + }, + { + "name": "number", + "type": "LONG" + }, + { + "name": "os", + "type": "STRING" + }, + { + "name": "path", + "type": "STRING" + }, + { + "name": "platform", + "type": "STRING" + }, + { + "name": "referrer", + "type": "STRING" + }, + { + "name": "referrer_host", + "type": "STRING" + }, + { + "name": "region", + "type": "STRING" + }, + { + "name": "remote_address", + "type": "STRING" + }, + { + "name": "screen", + "type": "STRING" + }, + { + "name": "session", + "type": "STRING" + }, + { + "name": "session_length", + "type": "LONG" + }, + { + "name": "timezone", + "type": "STRING" + }, + { + "name": "timezone_offset", + "type": "LONG" + }, + { + "name": "window", + "type": "STRING" + } + ] + } + ], + "processor": { + "type": "scan", + "query": { + "queryType": "scan", + "dataSource": { + "type": "inputNumber", + "inputNumber": 0 + }, + "intervals": { + "type": "intervals", + "intervals": [ + "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" + ] + }, + "resultFormat": "compactedList", + "columns": [ + "adblock_list", + "agent_category", + "agent_type", + "browser", + "browser_version", + "city", + "continent", + "country", + "event_subtype", + "event_type", + "forwarded_for", + "language", + "loaded_image", + "number", + "os", + "path", + "platform", + "referrer", + "referrer_host", + "region", + "remote_address", + "screen", + "session", + "session_length", + "timestamp", + "timezone", + "timezone_offset", + "version", + "window" + ], + "legacy": false, + "context": { + "finalize": true, + "msqMaxNumTasks": 3, + "msqSignature": "[{\"name\":\"adblock_list\",\"type\":\"STRING\"},{\"name\":\"agent_category\",\"type\":\"STRING\"},{\"name\":\"agent_type\",\"type\":\"STRING\"},{\"name\":\"browser\",\"type\":\"STRING\"},{\"name\":\"browser_version\",\"type\":\"STRING\"},{\"name\":\"city\",\"type\":\"STRING\"},{\"name\":\"continent\",\"type\":\"STRING\"},{\"name\":\"country\",\"type\":\"STRING\"},{\"name\":\"event_subtype\",\"type\":\"STRING\"},{\"name\":\"event_type\",\"type\":\"STRING\"},{\"name\":\"forwarded_for\",\"type\":\"STRING\"},{\"name\":\"language\",\"type\":\"STRING\"},{\"name\":\"loaded_image\",\"type\":\"STRING\"},{\"name\":\"number\",\"type\":\"LONG\"},{\"name\":\"os\",\"type\":\"STRING\"},{\"name\":\"path\",\"type\":\"STRING\"},{\"name\":\"platform\",\"type\":\"STRING\"},{\"name\":\"referrer\",\"type\":\"STRING\"},{\"name\":\"referrer_host\",\"type\":\"STRING\"},{\"name\":\"region\",\"type\":\"STRING\"},{\"name\":\"remote_address\",\"type\":\"STRING\"},{\"name\":\"screen\",\"type\":\"STRING\"},{\"name\":\"session\",\"type\":\"STRING\"},{\"name\":\"session_length\",\"type\":\"LONG\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"timezone\",\"type\":\"STRING\"},{\"name\":\"timezone_offset\",\"type\":\"LONG\"},{\"name\":\"version\",\"type\":\"STRING\"},{\"name\":\"window\",\"type\":\"STRING\"}]", + "multiStageQuery": true, + "sqlInsertSegmentGranularity": "{\"type\":\"all\"}", + "sqlQueryId": "a6b65442-f77e-44e4-af28-ab3b711a27ac", + "sqlReplaceTimeChunks": "all" + }, + "granularity": { + "type": "all" + } + } + }, + "signature": [ + { + "name": "__boost", + "type": "LONG" + }, + { + "name": "adblock_list", + "type": "STRING" + }, + { + "name": "agent_category", + "type": "STRING" + }, + { + "name": "agent_type", + "type": "STRING" + }, + { + "name": "browser", + "type": "STRING" + }, + { + "name": "browser_version", + "type": "STRING" + }, + { + "name": "city", + "type": "STRING" + }, + { + "name": "continent", + "type": "STRING" + }, + { + "name": "country", + "type": "STRING" + }, + { + "name": "event_subtype", + "type": "STRING" + }, + { + "name": "event_type", + "type": "STRING" + }, + { + "name": "forwarded_for", + "type": "STRING" + }, + { + "name": "language", + "type": "STRING" + }, + { + "name": "loaded_image", + "type": "STRING" + }, + { + "name": "number", + "type": "LONG" + }, + { + "name": "os", + "type": "STRING" + }, + { + "name": "path", + "type": "STRING" + }, + { + "name": "platform", + "type": "STRING" + }, + { + "name": "referrer", + "type": "STRING" + }, + { + "name": "referrer_host", + "type": "STRING" + }, + { + "name": "region", + "type": "STRING" + }, + { + "name": "remote_address", + "type": "STRING" + }, + { + "name": "screen", + "type": "STRING" + }, + { + "name": "session", + "type": "STRING" + }, + { + "name": "session_length", + "type": "LONG" + }, + { + "name": "timestamp", + "type": "STRING" + }, + { + "name": "timezone", + "type": "STRING" + }, + { + "name": "timezone_offset", + "type": "LONG" + }, + { + "name": "version", + "type": "STRING" + }, + { + "name": "window", + "type": "STRING" + } + ], + "shuffleSpec": { + "type": "targetSize", + "clusterBy": { + "columns": [ + { + "columnName": "__boost" + } + ] + }, + "targetSize": 3000000, + "aggregate": false + }, + "maxWorkerCount": 2, + "shuffleCheckHasMultipleValues": true + }, + "phase": "FINISHED", + "workerCount": 1, + "partitionCount": 1, + "startTime": "2022-07-27T20:09:43.168Z", + "duration": 62837, + "sort": true + }, + { + "stageNumber": 1, + "definition": { + "id": "f224410f-1cad-4ee7-b10d-f10ddf8bb517_1", + "input": [ + { + "type": "stage", + "stage": 0 + } + ], + "processor": { + "type": "segmentGenerator", + "dataSchema": { + "dataSource": "kttm_simple", + "timestampSpec": { + "column": "__time", + "format": "millis", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "timestamp", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "agent_category", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "agent_type", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "browser", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "browser_version", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "city", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "continent", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "country", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "version", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "event_type", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "event_subtype", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "loaded_image", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "adblock_list", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "forwarded_for", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "language", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "number", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "os", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "path", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "platform", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "referrer", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "referrer_host", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "region", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "remote_address", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "screen", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "session", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "session_length", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "timezone", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "timezone_offset", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "window", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "arbitrary", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [ + "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" + ] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "columnMappings": [ + { + "queryColumn": "timestamp", + "outputColumn": "timestamp" + }, + { + "queryColumn": "agent_category", + "outputColumn": "agent_category" + }, + { + "queryColumn": "agent_type", + "outputColumn": "agent_type" + }, + { + "queryColumn": "browser", + "outputColumn": "browser" + }, + { + "queryColumn": "browser_version", + "outputColumn": "browser_version" + }, + { + "queryColumn": "city", + "outputColumn": "city" + }, + { + "queryColumn": "continent", + "outputColumn": "continent" + }, + { + "queryColumn": "country", + "outputColumn": "country" + }, + { + "queryColumn": "version", + "outputColumn": "version" + }, + { + "queryColumn": "event_type", + "outputColumn": "event_type" + }, + { + "queryColumn": "event_subtype", + "outputColumn": "event_subtype" + }, + { + "queryColumn": "loaded_image", + "outputColumn": "loaded_image" + }, + { + "queryColumn": "adblock_list", + "outputColumn": "adblock_list" + }, + { + "queryColumn": "forwarded_for", + "outputColumn": "forwarded_for" + }, + { + "queryColumn": "language", + "outputColumn": "language" + }, + { + "queryColumn": "number", + "outputColumn": "number" + }, + { + "queryColumn": "os", + "outputColumn": "os" + }, + { + "queryColumn": "path", + "outputColumn": "path" + }, + { + "queryColumn": "platform", + "outputColumn": "platform" + }, + { + "queryColumn": "referrer", + "outputColumn": "referrer" + }, + { + "queryColumn": "referrer_host", + "outputColumn": "referrer_host" + }, + { + "queryColumn": "region", + "outputColumn": "region" + }, + { + "queryColumn": "remote_address", + "outputColumn": "remote_address" + }, + { + "queryColumn": "screen", + "outputColumn": "screen" + }, + { + "queryColumn": "session", + "outputColumn": "session" + }, + { + "queryColumn": "session_length", + "outputColumn": "session_length" + }, + { + "queryColumn": "timezone", + "outputColumn": "timezone" + }, + { + "queryColumn": "timezone_offset", + "outputColumn": "timezone_offset" + }, + { + "queryColumn": "window", + "outputColumn": "window" + } + ], + "tuningConfig": { + "type": "index_parallel", + "maxRowsPerSegment": 3000000, + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 100000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxTotalRows": null, + "numShards": null, + "splitHintSpec": null, + "partitionsSpec": { + "type": "dynamic", + "maxRowsPerSegment": 3000000, + "maxTotalRows": null + }, + "indexSpec": { + "bitmap": { + "type": "roaring", + "compressRunOnSerialization": true + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs", + "segmentLoader": null + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring", + "compressRunOnSerialization": true + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs", + "segmentLoader": null + }, + "maxPendingPersists": 0, + "forceGuaranteedRollup": false, + "reportParseExceptions": false, + "pushTimeout": 0, + "segmentWriteOutMediumFactory": null, + "maxNumConcurrentSubTasks": 2, + "maxRetry": 1, + "taskStatusCheckPeriodMs": 1000, + "chatHandlerTimeout": "PT10S", + "chatHandlerNumRetries": 5, + "maxNumSegmentsToMerge": 100, + "totalNumMergeTasks": 10, + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "maxColumnsToMerge": -1, + "awaitSegmentAvailabilityTimeoutMillis": 0, + "maxAllowedLockCount": -1, + "partitionDimensions": [] + } + }, + "signature": [], + "maxWorkerCount": 2 + }, + "phase": "FINISHED", + "workerCount": 1, + "partitionCount": 1, + "startTime": "2022-07-27T20:10:45.840Z", + "duration": 50590 + } + ], + "counters": { + "0": { + "0": { + "input0": { + "type": "channel", + "rows": [ + 465346 + ], + "files": [ + 1 + ], + "totalFiles": [ + 1 + ] + }, + "output": { + "type": "channel", + "rows": [ + 465346 + ], + "bytes": [ + 267146161 + ], + "frames": [ + 42 + ] + }, + "sort": { + "type": "channel", + "rows": [ + 465346 + ], + "bytes": [ + 265300383 + ], + "frames": [ + 501 + ] + }, + "sortProgress": { + "type": "sortProgress", + "totalMergingLevels": 3, + "levelToTotalBatches": { + "0": 9, + "1": 2, + "2": 1 + }, + "levelToMergedBatches": { + "0": 9, + "1": 2, + "2": 1 + }, + "totalMergersForUltimateLevel": 1, + "progressDigest": 1.0 + } + } + }, + "1": { + "0": { + "input0": { + "type": "channel", + "rows": [ + 465346 + ], + "bytes": [ + 265300383 + ], + "frames": [ + 501 + ] + } + } + } + } + } + } +} +``` + +### Report response fields + +The following table describes the response fields when you retrieve a report for a MSQ task engine using the `/druid/indexer/v1/task//report` endpoint: + +|Field|Description| +|-----|-----------| +|multiStageQuery.taskId|Controller task ID.| +|multiStageQuery.payload.status|Query status container.| +|multiStageQuery.payload.status.status|RUNNING, SUCCESS, or FAILED.| +|multiStageQuery.payload.status.startTime|Start time of the query in ISO format. Only present if the query has started running.| +|multiStageQuery.payload.status.durationMs|Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet.| +|multiStageQuery.payload.status.errorReport|Error object. Only present if there was an error.| +|multiStageQuery.payload.status.errorReport.taskId|The task that reported the error, if known. May be a controller task or a worker task.| +|multiStageQuery.payload.status.errorReport.host|The hostname and port of the task that reported the error, if known.| +|multiStageQuery.payload.status.errorReport.stageNumber|The stage number that reported the error, if it happened during execution of a specific stage.| +|multiStageQuery.payload.status.errorReport.error|Error object. Contains `errorCode` at a minimum, and may contain other fields as described in the [error code table](./msq-concepts.md#error-codes). Always present if there is an error.| +|multiStageQuery.payload.status.errorReport.error.errorCode|One of the error codes from the [error code table](./msq-concepts.md#error-codes). Always present if there is an error.| +|multiStageQuery.payload.status.errorReport.error.errorMessage|User-friendly error message. Not always present, even if there is an error.| +|multiStageQuery.payload.status.errorReport.exceptionStackTrace|Java stack trace in string form, if the error was due to a server-side exception.| +|multiStageQuery.payload.stages|Array of query stages.| +|multiStageQuery.payload.stages[].stageNumber|Each stage has a number that differentiates it from other stages.| +|multiStageQuery.payload.stages[].phase|Either NEW, READING_INPUT, POST_READING, RESULTS_COMPLETE, or FAILED. Only present if the stage has started.| +|multiStageQuery.payload.stages[].workerCount|Number of parallel tasks that this stage is running on. Only present if the stage has started.| +|multiStageQuery.payload.stages[].partitionCount|Number of output partitions generated by this stage. Only present if the stage has started and has computed its number of output partitions.| +|multiStageQuery.payload.stages[].startTime|Start time of this stage. Only present if the stage has started.| +|multiStageQuery.payload.stages[].duration|The number of milliseconds that the stage has been running. Only present if the stage has started.| +|multiStageQuery.payload.stages[].sort|A boolean that is set to `true` if the stage does a sort as part of its execution.| +|multiStageQuery.payload.stages[].definition|The object defining what the stage does.| +|multiStageQuery.payload.stages[].definition.id|The unique identifier of the stage.| +|multiStageQuery.payload.stages[].definition.input|Array of inputs that the stage has.| +|multiStageQuery.payload.stages[].definition.broadcast|Array of input indexes that get broadcasted. Only present if there are inputs that get broadcasted.| +|multiStageQuery.payload.stages[].definition.processor|An object defining the processor logic.| +|multiStageQuery.payload.stages[].definition.signature|The output signature of the stage.| + +## Cancel a query task + +### Request + + + + + +``` +POST /druid/indexer/v1/task//shutdown +``` + + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +```bash +curl --location --request POST 'https://:@:/druid/indexer/v1/task//shutdown' +``` + + + +Make sure you replace `username`, `password`, `your-instance`, `port`, and `taskId` with the values for your deployment. + +``` +import requests + +url = "https://:@:/druid/indexer/v1/task//shutdown" + +payload={} +headers = {} + +response = requests.request("POST", url, headers=headers, data=payload) + +print(response.text) +``` + + + +### Response + +``` +{ + "task": "query-655efe33-781a-4c50-ae84-c2911b42d63c" +} +``` diff --git a/docs/multi-stage-query/msq-concepts.md b/docs/multi-stage-query/msq-concepts.md new file mode 100644 index 000000000000..4f1cf7ce1db6 --- /dev/null +++ b/docs/multi-stage-query/msq-concepts.md @@ -0,0 +1,168 @@ +--- +id: concepts +title: SQL-based ingestion concepts +sidebar_label: Key concepts +--- + + + +> SQL-based ingestion using the multi-stage query task engine is our recommended solution starting in Druid 24.0. Alternative ingestion solutions, such as native batch and Hadoop-based ingestion systems, will still be supported. We recommend you read all [known issues](./msq-known-issues.md) and test the feature in a development environment before rolling it out in production. Using the multi-stage query task engine with `SELECT` statements that do not write to a datasource is experimental. + +This topic covers the main concepts and terminology of the multi-stage query architecture. + +## Vocabulary + +You might see the following terms in the documentation or while you're using the multi-stage query architecture and task engine, such as when you view the report for a query: + +- **Controller**: An indexing service task of type `query_controller` that manages + the execution of a query. There is one controller task per query. + +- **Worker**: Indexing service tasks of type `query_worker` that execute a + query. There can be multiple worker tasks per query. Internally, + the tasks process items in parallel using their processing pools (up to `druid.processing.numThreads` of execution parallelism + within a worker task). + +- **Stage**: A stage of query execution that is parallelized across + worker tasks. Workers exchange data with each other between stages. + +- **Partition**: A slice of data output by worker tasks. In INSERT or REPLACE + queries, the partitions of the final stage become Druid segments. + +- **Shuffle**: Workers exchange data between themselves on a per-partition basis in a process called + shuffling. During a shuffle, each output partition is sorted by a clustering key. + +## How the MSQ task engine works + +Query tasks, specifically queries for INSERT, REPLACE, and SELECT, execute using indexing service tasks. Every query occupies at least two task slots while running. + +When you submit a query task to the MSQ task engine, the following happens: + +1. The Broker plans your SQL query into a native query, as usual. + +2. The Broker wraps the native query into a task of type `query_controller` + and submits it to the indexing service. + +3. The Broker returns the task ID to you and exits. + +4. The controller task launches some number of worker tasks determined by + the `maxNumTasks` and `taskAssignment` [context parameters](./msq-reference.md#context-parameters). You can set these settings individually for each query. + +5. The worker tasks execute the query. + +6. If the query is a SELECT query, the worker tasks send the results + back to the controller task, which writes them into its task report. + If the query is an INSERT or REPLACE query, the worker tasks generate and + publish new Druid segments to the provided datasource. + + +## Parallelism + +Parallelism affects performance. + +The [`maxNumTasks`](./msq-reference.md#context-parameters) query parameter determines the maximum number of tasks (workers and one controller) your query will use. Generally, queries perform better with more workers. The lowest possible value of `maxNumTasks` is two (one worker and one controller), and the highest possible value is equal to the number of free task slots in your cluster. + +The `druid.worker.capacity` server property on each Middle Manager determines the maximum number +of worker tasks that can run on each server at once. Worker tasks run single-threaded, which +also determines the maximum number of processors on the server that can contribute towards +multi-stage queries. Since data servers are shared between Historicals and +Middle Managers, the default setting for `druid.worker.capacity` is lower than the number of +processors on the server. Advanced users may consider enhancing parallelism by increasing this +value to one less than the number of processors on the server. In most cases, this increase must +be accompanied by an adjustment of the memory allotment of the Historical process, +Middle-Manager-launched tasks, or both, to avoid memory overcommitment and server instability. If +you are not comfortable tuning these memory usage parameters to avoid overcommitment, it is best +to stick with the default `druid.worker.capacity`. + +## Memory usage + +Increasing the amount of available memory can improve performance as follows: + +- Segment generation becomes more efficient when data doesn't spill to disk as often. +- Sorting stage output data becomes more efficient since available memory affects the + number of required sorting passes. + +Worker tasks use both JVM heap memory and off-heap ("direct") memory. + +On Peons launched by Middle Managers, the bulk of the JVM heap (75%) is split up into two bundles of equal size: one processor bundle and one worker bundle. Each one comprises 37.5% of the available JVM heap. + +The processor memory bundle is used for query processing and segment generation. Each processor bundle must +also provides space to buffer I/O between stages. Specifically, each downstream stage requires 1 MB of buffer space for +each upstream worker. For example, if you have 100 workers running in stage 0, and stage 1 reads from stage 0, +then each worker in stage 1 requires 1M * 100 = 100 MB of memory for frame buffers. + +The worker memory bundle is used for sorting stage output data prior to shuffle. Workers can sort +more data than fits in memory; in this case, they will switch to using disk. + +Worker tasks also use off-heap ("direct") memory. Set the amount of direct +memory available (`-XX:MaxDirectMemorySize`) to at least +`(druid.processing.numThreads + 1) * druid.processing.buffer.sizeBytes`. Increasing the +amount of direct memory available beyond the minimum does not speed up processing. + +It may be necessary to override one or more memory-related parameters if you run into one of the [known issues around memory usage](./msq-known-issues.md#memory-usage). + +## Limits + +Knowing the limits for the MSQ task engine can help you troubleshoot any [errors](#error-codes) that you encounter. Many of the errors occur as a result of reaching a limit. + +The following table lists query limits: + +|Limit|Value|Error if exceeded| +|-----|-----|-----------------| +| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. | 1 MB | `RowTooLarge` | +| Number of segment-granular time chunks encountered during ingestion. | 5,000 | `TooManyBuckets` | +| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles` | +| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions` | +| Number of output columns for any one stage. | 2,000 | `TooManyColumns` | +| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers` | +| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](#memory-usage). | `BroadcastTablesTooLarge` | + +## Error codes + +The following table describes error codes you may encounter in the `multiStageQuery.payload.status.errorReport.error.errorCode` field: + +|Code|Meaning|Additional fields| +|----|-----------|----| +| BroadcastTablesTooLarge | The size of the broadcast tables, used in right hand side of the joins, exceeded the memory reserved for them in a worker task. | `maxBroadcastTablesSize`: Memory reserved for the broadcast tables, measured in bytes. | +| Canceled | The query was canceled. Common reasons for cancellation:

  • User-initiated shutdown of the controller task via the `/druid/indexer/v1/task/{taskId}/shutdown` API.
  • Restart or failure of the server process that was running the controller task.
| | +| CannotParseExternalData | A worker task could not parse data from an external datasource. | | +| ColumnNameRestricted| The query uses a restricted column name. | | +| ColumnTypeNotSupported| Support for writing or reading from a particular column type is not supported. | | +| ColumnTypeNotSupported | The query attempted to use a column type that is not supported by the frame format. This occurs with ARRAY types, which are not yet implemented for frames. | `columnName`

`columnType` | +| InsertCannotAllocateSegment | The controller task could not allocate a new segment ID due to conflict with existing segments or pending segments. Common reasons for such conflicts:

  • Attempting to mix different granularities in the same intervals of the same datasource.
  • Prior ingestions that used non-extendable shard specs.
| `dataSource`

`interval`: The interval for the attempted new segment allocation. | +| InsertCannotBeEmpty | An INSERT or REPLACE query did not generate any output rows in a situation where output rows are required for success. This can happen for INSERT or REPLACE queries with `PARTITIONED BY` set to something other than `ALL` or `ALL TIME`. | `dataSource` | +| InsertCannotOrderByDescending | An INSERT query contained a `CLUSTERED BY` expression in descending order. Druid's segment generation code only supports ascending order. | `columnName` | +| InsertCannotReplaceExistingSegment | A REPLACE query cannot proceed because an existing segment partially overlaps those bounds, and the portion within the bounds is not fully overshadowed by query results.

There are two ways to address this without modifying your query:
  • Shrink the OVERLAP filter to match the query results.
  • Expand the OVERLAP filter to fully contain the existing segment.
| `segmentId`: The existing segment
+| InsertLockPreempted | An INSERT or REPLACE query was canceled by a higher-priority ingestion job, such as a real-time ingestion task. | | +| InsertTimeNull | An INSERT or REPLACE query encountered a null timestamp in the `__time` field.

This can happen due to using an expression like `TIME_PARSE(timestamp) AS __time` with a timestamp that cannot be parsed. (TIME_PARSE returns null when it cannot parse a timestamp.) In this case, try parsing your timestamps using a different function or pattern.

If your timestamps may genuinely be null, consider using COALESCE to provide a default value. One option is CURRENT_TIMESTAMP, which represents the start time of the job. | +| InsertTimeOutOfBounds | A REPLACE query generated a timestamp outside the bounds of the TIMESTAMP parameter for your OVERWRITE WHERE clause.

To avoid this error, verify that the you specified is valid. | `interval`: time chunk interval corresponding to the out-of-bounds timestamp | +| InvalidNullByte | A string column included a null byte. Null bytes in strings are not permitted. | `column`: The column that included the null byte | +| QueryNotSupported | QueryKit could not translate the provided native query to a multi-stage query.

This can happen if the query uses features that aren't supported, like GROUPING SETS. | | +| RowTooLarge | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for the specific limit on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. | +| TaskStartTimeout | Unable to launch all the worker tasks in time.

There might be insufficient available slots to start all the worker tasks simultaneously.

Try splitting up the query into smaller chunks with lesser `maxNumTasks` number. Another option is to increase capacity. | | +| TooManyBuckets | Exceeded the number of partition buckets for a stage. Partition buckets are only used for `segmentGranularity` during INSERT queries. The most common reason for this error is that your `segmentGranularity` is too narrow relative to the data. See the [Limits](./msq-concepts.md#limits) table for the specific limit. | `maxBuckets`: The limit on buckets. | +| TooManyInputFiles | Exceeded the number of input files/segments per worker. See the [Limits](./msq-concepts.md#limits) table for the specific limit. | `umInputFiles`: The total number of input files/segments for the stage.

`maxInputFiles`: The maximum number of input files/segments per worker per stage.

`minNumWorker`: The minimum number of workers required for a successful run. | +| TooManyPartitions | Exceeded the number of partitions for a stage. The most common reason for this is that the final stage of an INSERT or REPLACE query generated too many segments. See the [Limits](./msq-concepts.md#limits) table for the specific limit. | `maxPartitions`: The limit on partitions which was exceeded | +| TooManyColumns | Exceeded the number of columns for a stage. See the [Limits](#limits) table for the specific limit. | `maxColumns`: The limit on columns which was exceeded. | +| TooManyWarnings | Exceeded the allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit.

`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. | +| TooManyWorkers | Exceeded the supported number of workers running simultaneously. See the [Limits](#limits) table for the specific limit. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously.

`maxWorkers`: The hard or soft limit on workers that was exceeded. | +| NotEnoughMemory | Insufficient memory to launch a stage. | `serverMemory`: The amount of memory available to a single process.

`serverWorkers`: The number of workers running in a single process.

`serverThreads`: The number of threads in a single process. | +| WorkerFailed | A worker task failed unexpectedly. | `workerTaskId`: The ID of the worker task. | +| WorkerRpcFailed | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task | +| UnknownError | All other errors. | | \ No newline at end of file diff --git a/docs/multi-stage-query/msq-example-queries.md b/docs/multi-stage-query/msq-example-queries.md new file mode 100644 index 000000000000..f1b09e332f22 --- /dev/null +++ b/docs/multi-stage-query/msq-example-queries.md @@ -0,0 +1,503 @@ +--- +id: examples +title: SQL-based ingestion query examples +sidebar_label: Examples +--- + + + +> SQL-based ingestion using the multi-stage query task engine is our recommended solution starting in Druid 24.0. Alternative ingestion solutions, such as native batch and Hadoop-based ingestion systems, will still be supported. We recommend you read all [known issues](./msq-known-issues.md) and test the feature in a development environment before rolling it out in production. Using the multi-stage query task engine with `SELECT` statements that do not write to a datasource is experimental. + +These example queries show you some of the things you can do when modifying queries for your use case. Copy the example queries into the **Query** view of the Druid console and run them to see what they do. + +## INSERT with no rollup + +This example inserts data into a table named `w000` without performing any data rollup: + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +INSERT INTO w000 +SELECT + TIME_PARSE("timestamp") AS __time, + isRobot, + channel, + flags, + isUnpatrolled, + page, + diffUrl, + added, + comment, + commentLength, + isNew, + isMinor, + delta, + isAnonymous, + user, + deltaBucket, + deleted, + namespace, + cityName, + countryName, + regionIsoCode, + metroCode, + countryIsoCode, + regionName +FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type":"json"}', + '[{"name":"isRobot","type":"string"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"page","type":"string"},{"name":"diffUrl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentLength","type":"long"},{"name":"isNew","type":"string"},{"name":"isMinor","type":"string"},{"name":"delta","type":"long"},{"name":"isAnonymous","type":"string"},{"name":"user","type":"string"},{"name":"deltaBucket","type":"long"},{"name":"deleted","type":"long"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]' + ) + ) +PARTITIONED BY HOUR +CLUSTERED BY channel +``` + +
+ +## INSERT with rollup + +This example inserts data into a table named `kttm_data` and performs data rollup. This example implements the recommendations described in [multi-value dimensions](./index.md#multi-value-dimensions). + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +INSERT INTO "kttm_rollup" + +WITH kttm_data AS ( +SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/kttm/kttm-v2-2019-08-25.json.gz"]}', + '{"type":"json"}', + '[{"name":"timestamp","type":"string"},{"name":"agent_category","type":"string"},{"name":"agent_type","type":"string"},{"name":"browser","type":"string"},{"name":"browser_version","type":"string"},{"name":"city","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"version","type":"string"},{"name":"event_type","type":"string"},{"name":"event_subtype","type":"string"},{"name":"loaded_image","type":"string"},{"name":"adblock_list","type":"string"},{"name":"forwarded_for","type":"string"},{"name":"language","type":"string"},{"name":"number","type":"long"},{"name":"os","type":"string"},{"name":"path","type":"string"},{"name":"platform","type":"string"},{"name":"referrer","type":"string"},{"name":"referrer_host","type":"string"},{"name":"region","type":"string"},{"name":"remote_address","type":"string"},{"name":"screen","type":"string"},{"name":"session","type":"string"},{"name":"session_length","type":"long"},{"name":"timezone","type":"string"},{"name":"timezone_offset","type":"long"},{"name":"window","type":"string"}]' + ) +)) + +SELECT + FLOOR(TIME_PARSE("timestamp") TO MINUTE) AS __time, + session, + agent_category, + agent_type, + browser, + browser_version, + MV_TO_ARRAY("language") AS "language", -- Multi-value string dimension + os, + city, + country, + forwarded_for AS ip_address, + + COUNT(*) AS "cnt", + SUM(session_length) AS session_length, + APPROX_COUNT_DISTINCT_DS_HLL(event_type) AS unique_event_types +FROM kttm_data +WHERE os = 'iOS' +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 +PARTITIONED BY HOUR +CLUSTERED BY browser, session +``` + +
+ +## INSERT for reindexing an existing datasource + +This example aggregates data from a table named `w000` and inserts the result into `w002`. + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +INSERT INTO w002 +SELECT + FLOOR(__time TO MINUTE) AS __time, + channel, + countryIsoCode, + countryName, + regionIsoCode, + regionName, + page, + COUNT(*) AS cnt, + SUM(added) AS sum_added, + SUM(deleted) AS sum_deleted +FROM w000 +GROUP BY 1, 2, 3, 4, 5, 6, 7 +PARTITIONED BY HOUR +CLUSTERED BY page +``` + +
+ + +## INSERT with JOIN + +This example inserts data into a table named `w003` and joins data from two sources: + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +INSERT INTO w003 +WITH +wikidata AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type":"json"}', + '[{"name":"isRobot","type":"string"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"page","type":"string"},{"name":"diffUrl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentLength","type":"long"},{"name":"isNew","type":"string"},{"name":"isMinor","type":"string"},{"name":"delta","type":"long"},{"name":"isAnonymous","type":"string"},{"name":"user","type":"string"},{"name":"deltaBucket","type":"long"},{"name":"deleted","type":"long"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]' + ) +)), +countries AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/lookup/country.tsv"]}', + '{"type":"tsv","findColumnsFromHeader":true}', + '[{"name":"Country","type":"string"},{"name":"Capital","type":"string"},{"name":"ISO3","type":"string"},{"name":"ISO2","type":"string"}]' + ) +)) +SELECT + TIME_PARSE("timestamp") AS __time, + isRobot, + channel, + flags, + isUnpatrolled, + page, + diffUrl, + added, + comment, + commentLength, + isNew, + isMinor, + delta, + isAnonymous, + user, + deltaBucket, + deleted, + namespace, + cityName, + countryName, + regionIsoCode, + metroCode, + countryIsoCode, + countries.Capital AS countryCapital, + regionName +FROM wikidata +LEFT JOIN countries ON wikidata.countryIsoCode = countries.ISO2 +PARTITIONED BY HOUR +``` + +
+ +## REPLACE an entire datasource + +This example replaces the entire datasource used in the table `w007` with the new query data while dropping the old data: + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +REPLACE INTO w007 +OVERWRITE ALL +SELECT + TIME_PARSE("timestamp") AS __time, + isRobot, + channel, + flags, + isUnpatrolled, + page, + diffUrl, + added, + comment, + commentLength, + isNew, + isMinor, + delta, + isAnonymous, + user, + deltaBucket, + deleted, + namespace, + cityName, + countryName, + regionIsoCode, + metroCode, + countryIsoCode, + regionName +FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/wikipedia.json.gz"]}', + '{"type":"json"}', + '[{"name":"isRobot","type":"string"},{"name":"channel","type":"string"},{"name":"timestamp","type":"string"},{"name":"flags","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"page","type":"string"},{"name":"diffUrl","type":"string"},{"name":"added","type":"long"},{"name":"comment","type":"string"},{"name":"commentLength","type":"long"},{"name":"isNew","type":"string"},{"name":"isMinor","type":"string"},{"name":"delta","type":"long"},{"name":"isAnonymous","type":"string"},{"name":"user","type":"string"},{"name":"deltaBucket","type":"long"},{"name":"deleted","type":"long"},{"name":"namespace","type":"string"},{"name":"cityName","type":"string"},{"name":"countryName","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"metroCode","type":"long"},{"name":"countryIsoCode","type":"string"},{"name":"regionName","type":"string"}]' + ) + ) +PARTITIONED BY HOUR +CLUSTERED BY channel +``` + +
+ +## REPLACE for replacing a specific time segment + +This example replaces certain segments in a datasource with the new query data while dropping old segments: + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +REPLACE INTO w007 +OVERWRITE WHERE __time >= TIMESTAMP '2019-08-25 02:00:00' AND __time < TIMESTAMP '2019-08-25 03:00:00' +SELECT + FLOOR(__time TO MINUTE) AS __time, + channel, + countryIsoCode, + countryName, + regionIsoCode, + regionName, + page +FROM w007 +WHERE __time >= TIMESTAMP '2019-08-25 02:00:00' AND __time < TIMESTAMP '2019-08-25 03:00:00' AND countryName = "Canada" +PARTITIONED BY HOUR +CLUSTERED BY page +``` + +
+ +## REPLACE for reindexing an existing datasource into itself + +
Show the query + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +REPLACE INTO w000 +OVERWRITE ALL +SELECT + FLOOR(__time TO MINUTE) AS __time, + channel, + countryIsoCode, + countryName, + regionIsoCode, + regionName, + page, + COUNT(*) AS cnt, + SUM(added) AS sum_added, + SUM(deleted) AS sum_deleted +FROM w000 +GROUP BY 1, 2, 3, 4, 5, 6, 7 +PARTITIONED BY HOUR +CLUSTERED BY page +``` + +
+ +## SELECT with EXTERN and JOIN + + +
Show the query + + +```sql +--:context finalizeAggregations: false +--:context groupByEnableMultiValueUnnesting: false + +WITH flights AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/flights/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2005_11.csv.zip"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"depaturetime","type":"string"},{"name":"arrivalime","type":"string"},{"name":"Year","type":"long"},{"name":"Quarter","type":"long"},{"name":"Month","type":"long"},{"name":"DayofMonth","type":"long"},{"name":"DayOfWeek","type":"long"},{"name":"FlightDate","type":"string"},{"name":"Reporting_Airline","type":"string"},{"name":"DOT_ID_Reporting_Airline","type":"long"},{"name":"IATA_CODE_Reporting_Airline","type":"string"},{"name":"Tail_Number","type":"string"},{"name":"Flight_Number_Reporting_Airline","type":"long"},{"name":"OriginAirportID","type":"long"},{"name":"OriginAirportSeqID","type":"long"},{"name":"OriginCityMarketID","type":"long"},{"name":"Origin","type":"string"},{"name":"OriginCityName","type":"string"},{"name":"OriginState","type":"string"},{"name":"OriginStateFips","type":"long"},{"name":"OriginStateName","type":"string"},{"name":"OriginWac","type":"long"},{"name":"DestAirportID","type":"long"},{"name":"DestAirportSeqID","type":"long"},{"name":"DestCityMarketID","type":"long"},{"name":"Dest","type":"string"},{"name":"DestCityName","type":"string"},{"name":"DestState","type":"string"},{"name":"DestStateFips","type":"long"},{"name":"DestStateName","type":"string"},{"name":"DestWac","type":"long"},{"name":"CRSDepTime","type":"long"},{"name":"DepTime","type":"long"},{"name":"DepDelay","type":"long"},{"name":"DepDelayMinutes","type":"long"},{"name":"DepDel15","type":"long"},{"name":"DepartureDelayGroups","type":"long"},{"name":"DepTimeBlk","type":"string"},{"name":"TaxiOut","type":"long"},{"name":"WheelsOff","type":"long"},{"name":"WheelsOn","type":"long"},{"name":"TaxiIn","type":"long"},{"name":"CRSArrTime","type":"long"},{"name":"ArrTime","type":"long"},{"name":"ArrDelay","type":"long"},{"name":"ArrDelayMinutes","type":"long"},{"name":"ArrDel15","type":"long"},{"name":"ArrivalDelayGroups","type":"long"},{"name":"ArrTimeBlk","type":"string"},{"name":"Cancelled","type":"long"},{"name":"CancellationCode","type":"string"},{"name":"Diverted","type":"long"},{"name":"CRSElapsedTime","type":"long"},{"name":"ActualElapsedTime","type":"long"},{"name":"AirTime","type":"long"},{"name":"Flights","type":"long"},{"name":"Distance","type":"long"},{"name":"DistanceGroup","type":"long"},{"name":"CarrierDelay","type":"long"},{"name":"WeatherDelay","type":"long"},{"name":"NASDelay","type":"long"},{"name":"SecurityDelay","type":"long"},{"name":"LateAircraftDelay","type":"long"},{"name":"FirstDepTime","type":"string"},{"name":"TotalAddGTime","type":"string"},{"name":"LongestAddGTime","type":"string"},{"name":"DivAirportLandings","type":"string"},{"name":"DivReachedDest","type":"string"},{"name":"DivActualElapsedTime","type":"string"},{"name":"DivArrDelay","type":"string"},{"name":"DivDistance","type":"string"},{"name":"Div1Airport","type":"string"},{"name":"Div1AirportID","type":"string"},{"name":"Div1AirportSeqID","type":"string"},{"name":"Div1WheelsOn","type":"string"},{"name":"Div1TotalGTime","type":"string"},{"name":"Div1LongestGTime","type":"string"},{"name":"Div1WheelsOff","type":"string"},{"name":"Div1TailNum","type":"string"},{"name":"Div2Airport","type":"string"},{"name":"Div2AirportID","type":"string"},{"name":"Div2AirportSeqID","type":"string"},{"name":"Div2WheelsOn","type":"string"},{"name":"Div2TotalGTime","type":"string"},{"name":"Div2LongestGTime","type":"string"},{"name":"Div2WheelsOff","type":"string"},{"name":"Div2TailNum","type":"string"},{"name":"Div3Airport","type":"string"},{"name":"Div3AirportID","type":"string"},{"name":"Div3AirportSeqID","type":"string"},{"name":"Div3WheelsOn","type":"string"},{"name":"Div3TotalGTime","type":"string"},{"name":"Div3LongestGTime","type":"string"},{"name":"Div3WheelsOff","type":"string"},{"name":"Div3TailNum","type":"string"},{"name":"Div4Airport","type":"string"},{"name":"Div4AirportID","type":"string"},{"name":"Div4AirportSeqID","type":"string"},{"name":"Div4WheelsOn","type":"string"},{"name":"Div4TotalGTime","type":"string"},{"name":"Div4LongestGTime","type":"string"},{"name":"Div4WheelsOff","type":"string"},{"name":"Div4TailNum","type":"string"},{"name":"Div5Airport","type":"string"},{"name":"Div5AirportID","type":"string"},{"name":"Div5AirportSeqID","type":"string"},{"name":"Div5WheelsOn","type":"string"},{"name":"Div5TotalGTime","type":"string"},{"name":"Div5LongestGTime","type":"string"},{"name":"Div5WheelsOff","type":"string"},{"name":"Div5TailNum","type":"string"},{"name":"Unnamed: 109","type":"string"}]' + ) +)), +L_AIRPORT AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/dimensions/L_AIRPORT.csv"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"Code","type":"string"},{"name":"Description","type":"string"}]' + ) +)), +L_AIRPORT_ID AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/dimensions/L_AIRPORT_ID.csv"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"Code","type":"long"},{"name":"Description","type":"string"}]' + ) +)), +L_AIRLINE_ID AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/dimensions/L_AIRLINE_ID.csv"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"Code","type":"long"},{"name":"Description","type":"string"}]' + ) +)), +L_CITY_MARKET_ID AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/dimensions/L_CITY_MARKET_ID.csv"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"Code","type":"long"},{"name":"Description","type":"string"}]' + ) +)), +L_CANCELLATION AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/dimensions/L_CANCELLATION.csv"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"Code","type":"string"},{"name":"Description","type":"string"}]' + ) +)), +L_STATE_FIPS AS ( + SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://static.imply.io/data/FlightCarrierOnTime/dimensions/L_STATE_FIPS.csv"]}', + '{"type":"csv","findColumnsFromHeader":true}', + '[{"name":"Code","type":"long"},{"name":"Description","type":"string"}]' + ) +)) +SELECT + depaturetime, + arrivalime, + -- "Year", + -- Quarter, + -- "Month", + -- DayofMonth, + -- DayOfWeek, + -- FlightDate, + Reporting_Airline, + + DOT_ID_Reporting_Airline, + DOTAirlineLookup.Description AS DOT_Reporting_Airline, + + IATA_CODE_Reporting_Airline, + Tail_Number, + Flight_Number_Reporting_Airline, + + OriginAirportID, + OriginAirportIDLookup.Description AS OriginAirport, + + OriginAirportSeqID, + + OriginCityMarketID, + OriginCityMarketIDLookup.Description AS OriginCityMarket, + + Origin, + OriginAirportLookup.Description AS OriginDescription, + + OriginCityName, + OriginState, + + OriginStateFips, + OriginStateFipsLookup.Description AS OriginStateFipsDescription, + + OriginStateName, + OriginWac, + + DestAirportID, + DestAirportIDLookup.Description AS DestAirport, + + DestAirportSeqID, + + DestCityMarketID, + DestCityMarketIDLookup.Description AS DestCityMarket, + + Dest, + DestAirportLookup.Description AS DestDescription, + + DestCityName, + DestState, + + DestStateFips, + DestStateFipsLookup.Description AS DestStateFipsDescription, + + DestStateName, + DestWac, + + CRSDepTime, + DepTime, + DepDelay, + DepDelayMinutes, + DepDel15, + DepartureDelayGroups, + DepTimeBlk, + TaxiOut, + WheelsOff, + WheelsOn, + TaxiIn, + CRSArrTime, + ArrTime, + ArrDelay, + ArrDelayMinutes, + ArrDel15, + ArrivalDelayGroups, + ArrTimeBlk, + + Cancelled, + CancellationCode, + CancellationCodeLookup.Description AS CancellationReason, + + Diverted, + CRSElapsedTime, + ActualElapsedTime, + AirTime, + Flights, + Distance, + DistanceGroup, + CarrierDelay, + WeatherDelay, + NASDelay, + SecurityDelay, + LateAircraftDelay, + FirstDepTime, + TotalAddGTime, + LongestAddGTime +FROM "flights" +LEFT JOIN L_AIRLINE_ID AS DOTAirlineLookup ON DOT_ID_Reporting_Airline = DOTAirlineLookup.Code +LEFT JOIN L_AIRPORT AS OriginAirportLookup ON Origin = OriginAirportLookup.Code +LEFT JOIN L_AIRPORT AS DestAirportLookup ON Dest = DestAirportLookup.Code +LEFT JOIN L_AIRPORT_ID AS OriginAirportIDLookup ON OriginAirportID = OriginAirportIDLookup.Code +LEFT JOIN L_AIRPORT_ID AS DestAirportIDLookup ON DestAirportID = DestAirportIDLookup.Code +LEFT JOIN L_CITY_MARKET_ID AS OriginCityMarketIDLookup ON OriginCityMarketID = OriginCityMarketIDLookup.Code +LEFT JOIN L_CITY_MARKET_ID AS DestCityMarketIDLookup ON DestCityMarketID = DestCityMarketIDLookup.Code +LEFT JOIN L_STATE_FIPS AS OriginStateFipsLookup ON OriginStateFips = OriginStateFipsLookup.Code +LEFT JOIN L_STATE_FIPS AS DestStateFipsLookup ON DestStateFips = DestStateFipsLookup.Code +LEFT JOIN L_CANCELLATION AS CancellationCodeLookup ON CancellationCode = CancellationCodeLookup.Code +LIMIT 1000 +``` + +
+ +## Next steps + +* [Read Multi-stage queries](./msq-example-queries.md) to learn more about how multi-stage queries work. +* [Explore the Query view](../operations/druid-console.md) to learn about the UI tools to help you get started. diff --git a/docs/multi-stage-query/msq-known-issues.md b/docs/multi-stage-query/msq-known-issues.md new file mode 100644 index 000000000000..ac4b222fe655 --- /dev/null +++ b/docs/multi-stage-query/msq-known-issues.md @@ -0,0 +1,118 @@ +--- +id: known-issues +title: SQL-based ingestion known issues +sidebar_label: Known issues +--- + + + +> SQL-based ingestion using the multi-stage query task engine is our recommended solution starting in Druid 24.0. Alternative ingestion solutions, such as native batch and Hadoop-based ingestion systems, will still be supported. We recommend you read all [known issues](./msq-known-issues.md) and test the feature in a development environment before rolling it out in production. Using the multi-stage query task engine with `SELECT` statements that do not write to a datasource is experimental. + +## General query execution + +- There's no fault tolerance. If any task fails, the entire query fails. + +- Only one local file system per server is used for stage output data during multi-stage query + execution. If your servers have multiple local file systems, this causes queries to exhaust + available disk space earlier than expected. + +- When `msqMaxNumTasks` is higher than the total + capacity of the cluster, more tasks may be launched than can run at once. This leads to a + [TaskStartTimeout](./msq-reference.md#context-parameters) error code, as there is never enough capacity to run the query. + To avoid this, set `msqMaxNumTasks` to a number of tasks that can run simultaneously on your cluster. + +- When `msqTaskAssignment` is set to `auto`, the system generates one task per input file for certain splittable + input sources where file sizes are not known ahead of time. This includes the `http` input source, where the system + generates one task per URI. + +## Memory usage + +- INSERT queries can consume excessive memory when using complex types due to inaccurate footprint + estimation. This can appear as an OutOfMemoryError during the SegmentGenerator stage when using + sketches. If you run into this issue, try manually lowering the value of the + [`msqRowsInMemory`](./msq-reference.md#context-parameters) parameter. + +- EXTERN loads an entire row group into memory at once when reading from Parquet files. Row groups + can be up to 1 GB in size, which can lead to excessive heap usage when reading many files in + parallel. This can appear as an OutOfMemoryError during stages that read Parquet input files. If + you run into this issue, try using a smaller number of worker tasks or you can increase the heap + size of your Indexers or of your Middle Manager-launched indexing tasks. + +- Ingesting a very long row may consume excessive memory and result in an OutOfMemoryError. If a row is read + which requires more memory than is available, the service might throw OutOfMemoryError. If you run into this + issue, allocate enough memory to be able to store the largest row to the indexer. + +## SELECT queries + +- SELECT query results do not include real-time data until it has been published. + +- TIMESTAMP types are formatted as numbers rather than ISO8601 timestamp + strings, which differs from Druid's standard result format. + +- BOOLEAN types are formatted as numbers like `1` and `0` rather + than `true` or `false`, which differs from Druid's standard result + format. + +- TopN is not implemented. The context parameter + `useApproximateTopN` is ignored and always treated as if it + were `false`. Therefore, topN-shaped queries will + always run using the groupBy engine. There is no loss of + functionality, but there may be a performance impact, since + these queries will run using an exact algorithm instead of an + approximate one. +- GROUPING SETS is not implemented. Queries that use GROUPING SETS + will fail. +- The numeric flavors of the EARLIEST and LATEST aggregators do not work properly. Attempting to use the numeric flavors of these aggregators will lead to an error like `java.lang.ClassCastException: class java.lang.Double cannot be cast to class org.apache.druid.collections.SerializablePair`. The string flavors, however, do work properly. + +## INSERT queries + +- The [schemaless dimensions](../ingestion/ingestion-spec.md#inclusions-and-exclusions) +feature is not available. All columns and their types must be specified explicitly. + +- [Segment metadata queries](../querying/segmentmetadataquery.md) + on datasources ingested with the Multi-Stage Query Engine will return values for`timestampSpec` that are not usable + for introspection. + +- When INSERT with GROUP BY does the match the criteria mentioned in [GROUP BY](./index.md#group-by), the multi-stage engine generates segments that Druid's compaction + functionality is not able to further roll up. This applies to automatic compaction as well as manually + issued `compact` tasks. Individual queries executed with the multi-stage engine always guarantee + perfect rollup for their output, so this only matters if you are performing a sequence of INSERT + queries that each append data to the same time chunk. If necessary, you can compact such data + using another SQL query instead of a `compact` task. + +- When using INSERT with GROUP BY, splitting of large partitions is not currently + implemented. If a single partition key appears in a + very large number of rows, an oversized segment will be created. + You can mitigate this by adding additional columns to your + partition key. Note that partition splitting _does_ work properly + when performing INSERT without GROUP BY. + +- INSERT with column lists, like + `INSERT INTO tbl (a, b, c) SELECT ...`, is not implemented. + +## EXTERN queries + +- EXTERN does not accept `druid` input sources. + +## Missing guardrails + +- Maximum number of input files. Since there's no limit, the controller can potentially run out of memory tracking all input files + +- Maximum amount of local disk space to use for temporary data. No guardrail today means worker tasks may exhaust all available disk space. In this case, you will receive an [UnknownError](./msq-reference.md#error-codes)) with a message including "No space left on device". \ No newline at end of file diff --git a/docs/multi-stage-query/msq-reference.md b/docs/multi-stage-query/msq-reference.md new file mode 100644 index 000000000000..f5ef07124c19 --- /dev/null +++ b/docs/multi-stage-query/msq-reference.md @@ -0,0 +1,169 @@ +--- +id: reference +title: SQL-based ingestion reference +sidebar_label: Reference +--- + + + +> SQL-based ingestion using the multi-stage query task engine is our recommended solution starting in Druid 24.0. Alternative ingestion solutions, such as native batch and Hadoop-based ingestion systems, will still be supported. We recommend you read all [known issues](./msq-known-issues.md) and test the feature in a development environment before rolling it out in production. Using the multi-stage query task engine with `SELECT` statements that do not write to a datasource is experimental. + +This topic is a reference guide for the multi-stage query architecture in Apache Druid. + +## Context parameters + +In addition to the Druid SQL [context parameters](../querying/sql-query-context.md), the multi-stage query task engine accepts certain context parameters that are specific to it. + +Use context parameters alongside your queries to customize the behavior of the query. If you're using the API, include the context parameters in the query context when you submit a query: + +```json +{ + "query": "SELECT 1 + 1", + "context": { + "": "", + "maxNumTasks": 3 + } +} +``` + +If you're using the Druid console, you can specify the context parameters through various UI options. + +The following table lists the context parameters for the MSQ task engine: + +|Parameter|Description|Default value| +|---------|-----------|-------------| +| maxNumTasks | SELECT, INSERT, REPLACE

The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.

May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 | +| taskAssignment | SELECT, INSERT, REPLACE

Determines how many tasks to use. Possible values include:
  • `max`: Use as many tasks as possible, up to the maximum `maxNumTasks`.
  • `auto`: Use as few tasks as possible without exceeding 10 GiB or 10,000 files per task. Review the [limitations](./msq-known-issues.md#general-query-execution) of `auto` mode before using it.
  • | `max` | +| finalizeAggregations | SELECT, INSERT, REPLACE

    Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true | +| rowsInMemory | INSERT or REPLACE

    Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues around memory usage](./msq-known-issues.md#memory-usage). | 100,000 | +| segmentSortOrder | INSERT or REPLACE

    Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](./index.md#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.

    You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list | +| maxParseExceptions| SELECT, INSERT, REPLACE

    Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 | +| rowsPerSegment | INSERT or REPLACE

    The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 | +| sqlTimeZone | Sets the time zone for this connection, which affects how time functions and timestamp literals behave. Use a time zone name like "America/Los_Angeles" or offset like "-08:00".| `druid.sql.planner.sqlTimeZone` on the Broker (default: UTC)| +| useApproximateCountDistinct | Whether to use an approximate cardinality algorithm for `COUNT(DISTINCT foo)`.| `druid.sql.planner.useApproximateCountDistinct` on the Broker (default: true)| + +## Error codes + +Error codes have corresponding human-readable messages that explain the error. For more information about the error codes, see [Error codes](./msq-concepts.md#error-codes). + +## SQL syntax + +The MSQ task engine has three primary SQL functions: + +- EXTERN +- INSERT +- REPLACE + +For information about using these functions and their corresponding examples, see [MSQ task engine query syntax](./index.md#msq-task-engine-query-syntax). For information about adjusting the shape of your data, see [Adjust query behavior](./index.md#adjust-query-behavior). + +### EXTERN + +Use the EXTERN function to read external data. + +Function format: + +```sql +SELECT + +FROM TABLE( + EXTERN( + '', + '', + '' + ) +) +``` + +EXTERN consists of the following parts: + +1. Any [Druid input source](../ingestion/native-batch-input-source.md) as a JSON-encoded string. +2. Any [Druid input format](../ingestion/data-formats.md) as a JSON-encoded string. +3. A row signature, as a JSON-encoded array of column descriptors. Each column descriptor must have a `name` and a `type`. The type can be `string`, `long`, `double`, or `float`. This row signature is used to map the external data into the SQL layer. + +### INSERT + +Use the INSERT function to insert data. + +Unlike standard SQL, INSERT inserts data according to column name and not positionally. This means that it is important for the output column names of subsequent INSERT queries to be the same as the table. Do not rely on their positions within the SELECT clause. + +Function format: + +```sql +INSERT INTO +SELECT + +FROM
    +PARTITIONED BY