From ccdbc746c1ff2690f94f0b6f15ed1a7b041a1a19 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 10 Jun 2024 16:29:40 -0400 Subject: [PATCH 1/7] SQL syntax error should target USER persona --- .../druid/sql/calcite/planner/DruidPlanner.java | 8 +------- .../druid/sql/calcite/planner/QueryHandler.java | 11 +---------- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +----- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 4b697a0d5dfa..cf1d22eb39b4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -380,13 +380,7 @@ public static DruidException translateException(Exception e) } } - return DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.UNCATEGORIZED) - .build( - inner, - "Unable to parse the SQL, unrecognized error from calcite: [%s]", - inner.getMessage() - ); + return InvalidSqlInput.exception(inner.getMessage()); } catch (RelOptPlanner.CannotPlanException inner) { return DruidException.forPersona(DruidException.Persona.USER) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 9f15d3822866..2d292621b57d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,16 +684,7 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. - // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual - // user. - throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - exception, - "Query could not be planned. A possible reason is [%s]", - errorMessage - ); + throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index dfee7d0e3a22..9c34c89bd8fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,11 +664,7 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - if (testBuilder().isDecoupledMode()) { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); - } else { - return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); - } + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); } /** From abe109262edf128e67f6373b96dc62ede370e3ea Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 14:34:12 -0400 Subject: [PATCH 2/7] * revert change to queryHandler and related tests, based on review comments --- .../druid/sql/calcite/planner/QueryHandler.java | 11 ++++++++++- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 2d292621b57d..9f15d3822866 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,7 +684,16 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); + // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. + // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual + // user. + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + exception, + "Query could not be planned. A possible reason is [%s]", + errorMessage + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 9c34c89bd8fd..dfee7d0e3a22 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,7 +664,11 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + if (testBuilder().isDecoupledMode()) { + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + } else { + return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); + } } /** From 5d730475dca1c90cc2fd4abc15d890a1d812acb1 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 15:58:16 -0400 Subject: [PATCH 3/7] * add test --- .../druid/sql/calcite/CalciteInsertDmlTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index bb9c03aa3c88..b40a4c87c3ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1626,6 +1626,18 @@ public void testInsertWithInvalidSelectStatement() .verify(); } + @Test + public void testInsertWithLongIdentifer() + { + // This test fails because an identifer is specified of length 200, which exceeds the length limit of 128 + // characters. + String longIdentifer = new String(new char[200]).replace('\0', 'a'); + testIngestionQuery() + .sql(StringUtils.format("INSERT INTO t SELECT %s FROM foo PARTITIONED BY ALL", longIdentifer)) // count is a keyword + .expectValidationError(invalidSqlContains(StringUtils.format("Length of identifier '%s' must be less than or equal to 128 characters", longIdentifer))) + .verify(); + } + @Test public void testInsertWithUnnamedColumnInSelectStatement() { From 524c951445daee2726ba7f7f73a912f3286a8b92 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 5 Aug 2024 11:45:53 -0400 Subject: [PATCH 4/7] Docs for Kinesis input format --- docs/ingestion/data-formats.md | 142 ++++++++++++++++++++++++++++ docs/ingestion/kinesis-ingestion.md | 1 + 2 files changed, 143 insertions(+) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index c9c23896a286..f4995c83d6da 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -731,6 +731,148 @@ This query returns: |--------------------|-----------|---------------|---------------| | `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | +### Kinesis + +The `kinesis` input format lets you parse the Kinesis metadata fields in addition to the Kinesis payload value contents. +It should only be used when ingesting from Kinesis. + +The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the kinesis event timestamp, and partition key. + +If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence. +This ensures that upgrading a Kinesis ingestion to use the Kinesis input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data. + +Configure the Kinesis `inputFormat` as follows: + +| Field | Type | Description | Required | Default | +|-------|------|---------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------| +| `type` | String | Set value to `kinesis`. | yes || +| `valueFormat` | [InputFormat](#input-format) | The [input format](#input-format) to parse the Kinesis value payload. | yes || +| `partitionKeyColumnName` | String | The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource. | no | `kinesis.partitionKey` | +| `timestampColumnName` | String | The name of the column for the Kinesis timestamp. | no | `kinesis.timestamp` | + +#### Example + +Using `{ "type": "json" }` as the input format would only parse the payload value. +To parse the Kinesis metadata in addition to the payload, use the `kinesis` input format. + +For example, consider the following structure for a Kafka message that represents an edit in a development environment: + +- **Kinesis timestamp**: `1680795276351` +- **Kinesis partition key**: `partition-1` +- **Kinesis payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}` + +You would configure it as follows: + +```json +{ + "ioConfig": { + "inputFormat": { + "type": "kinesis", + "valueFormat": { + "type": "json" + }, + "timestampColumnName": "kinesis.timestamp", + "partitionKeyColumnName": "kinesis.partitionKey" + } + } +} +``` + +You would parse the example message as follows: + +```json +{ + "channel": "#sv.wikipedia", + "timestamp": "2016-06-27T00:00:11.080Z", + "page": "Salo Toraut", + "delta": 31, + "namespace": "Main", + "kinesis.timestamp": 1680795276351, + "kinesis.partitionKey": "partition-1" +} +``` + +If you want to use `kinesis.timestamp` as Druid's primary timestamp (`__time`), specify it as the value for `column` in the `timestampSpec`: + +```json +"timestampSpec": { + "column": "kinesis.timestamp", + "format": "millis" +} +``` + +Finally, add these Kinesis metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. + +The following supervisor spec demonstrates how to ingest the Kinesis timestamp, and partition key into Druid dimensions: + +
+Click to view the example + +```json +{ + "type": "kinesis", + "spec": { + "ioConfig": { + "type": "kinesis", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "topic": "wiki-edits", + "inputFormat": { + "type": "kinesis", + "valueFormat": { + "type": "json" + }, + "headerFormat": { + "type": "string" + }, + "keyFormat": { + "type": "tsv", + "findColumnsFromHeader": false, + "columns": ["x"] + } + }, + "useEarliestOffset": true + }, + "dataSchema": { + "dataSource": "wikiticker", + "timestampSpec": { + "column": "timestamp", + "format": "posix" + }, + "dimensionsSpec": { + "useSchemaDiscovery": true, + "includeAllDimensions": true + }, + "granularitySpec": { + "queryGranularity": "none", + "rollup": false, + "segmentGranularity": "day" + } + }, + "tuningConfig": { + "type": "kinesis" + } + } +} +``` +
+ +After Druid ingests the data, you can query the Kinesis metadata columns as follows: + +```sql +SELECT + "kinesis.timestamp", + "kinesis.partitionKey" +FROM "wikiticker" +``` + +This query returns: + +| `kinesis.timestamp` | `kinesis.topic` | +|---------------------|-----------------| +| `1680795276351` | `partition-1` | + ## FlattenSpec You can use the `flattenSpec` object to flatten nested data, as an alternative to the Druid [nested columns](../querying/nested-columns.md) feature, and for nested input formats unsupported by the feature. It is an object within the `inputFormat` object. diff --git a/docs/ingestion/kinesis-ingestion.md b/docs/ingestion/kinesis-ingestion.md index 3b4c5de8548d..8ea2cfd4af35 100644 --- a/docs/ingestion/kinesis-ingestion.md +++ b/docs/ingestion/kinesis-ingestion.md @@ -139,6 +139,7 @@ The Kinesis indexing service supports both [`inputFormat`](data-formats.md#input The Kinesis indexing service supports the following values for `inputFormat`: +* `kinesis` * `csv` * `tvs` * `json` From 008989fc4589e78d369580df59f5d3491357db09 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 5 Aug 2024 11:48:10 -0400 Subject: [PATCH 5/7] * remove reference to kafka --- docs/ingestion/data-formats.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index f4995c83d6da..1616439384dd 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -755,7 +755,7 @@ Configure the Kinesis `inputFormat` as follows: Using `{ "type": "json" }` as the input format would only parse the payload value. To parse the Kinesis metadata in addition to the payload, use the `kinesis` input format. -For example, consider the following structure for a Kafka message that represents an edit in a development environment: +For example, consider the following structure for a Kinesis message that represents an edit in a development environment: - **Kinesis timestamp**: `1680795276351` - **Kinesis partition key**: `partition-1` From d6dd82d5bceacbeda04b52732f1d44126957c251 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 5 Aug 2024 14:58:13 -0400 Subject: [PATCH 6/7] * fix spellcheck error --- docs/ingestion/data-formats.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 1616439384dd..a56c941e4f52 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -736,7 +736,7 @@ This query returns: The `kinesis` input format lets you parse the Kinesis metadata fields in addition to the Kinesis payload value contents. It should only be used when ingesting from Kinesis. -The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the kinesis event timestamp, and partition key. +The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the Kinesis event timestamp, and partition key. If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence. This ensures that upgrading a Kinesis ingestion to use the Kinesis input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data. From 27d44e25c5b8b01b535a3b9343d66fbc580be422 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 5 Aug 2024 19:52:56 -0400 Subject: [PATCH 7/7] Apply suggestions from code review Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com> --- docs/ingestion/data-formats.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index a56c941e4f52..96d8597e7f34 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -736,7 +736,7 @@ This query returns: The `kinesis` input format lets you parse the Kinesis metadata fields in addition to the Kinesis payload value contents. It should only be used when ingesting from Kinesis. -The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the Kinesis event timestamp, and partition key. +The `kinesis` input format wraps around the payload parsing input format and augments the data it outputs with the Kinesis event timestamp and partition key, the `ApproximateArrivalTimestamp ` and `PartitionKey` fields in the Kinesis record. If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence. This ensures that upgrading a Kinesis ingestion to use the Kinesis input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data. @@ -755,7 +755,7 @@ Configure the Kinesis `inputFormat` as follows: Using `{ "type": "json" }` as the input format would only parse the payload value. To parse the Kinesis metadata in addition to the payload, use the `kinesis` input format. -For example, consider the following structure for a Kinesis message that represents an edit in a development environment: +For example, consider the following structure for a Kinesis record that represents an edit in a development environment: - **Kinesis timestamp**: `1680795276351` - **Kinesis partition key**: `partition-1` @@ -778,7 +778,7 @@ You would configure it as follows: } ``` -You would parse the example message as follows: +You would parse the example record as follows: ```json { @@ -801,7 +801,7 @@ If you want to use `kinesis.timestamp` as Druid's primary timestamp (`__time`), } ``` -Finally, add these Kinesis metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. +Finally, add these Kinesis metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to automatically detect columns. The following supervisor spec demonstrates how to ingest the Kinesis timestamp, and partition key into Druid dimensions: