From c89e67b7575c62565ded754d1599045fe56339d9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 16 Feb 2024 18:45:15 -0800 Subject: [PATCH 1/8] MSQ: Validate that strings and string arrays are not mixed. When multi-value strings and string arrays coexist in the same column, it causes problems with "classic MVD" style queries such as: select * from wikipedia -- fails at runtime select count(*) from wikipedia where flags = 'B' -- fails at planning time select flags, count(*) from wikipedia group by 1 -- fails at runtime To avoid these problems, this patch adds type verification for INSERT and REPLACE. It is targeted: the only type changes that are blocked are string-to-array and array-to-string. There is also a way to exclude certain columns from the type checks, if the user really knows what they're doing. --- .../apache/druid/msq/exec/ControllerImpl.java | 11 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 266 ++++++++++++++---- .../druid/msq/util/DimensionSchemaUtils.java | 101 ++++--- .../msq/util/MultiStageQueryContext.java | 29 +- .../msq/util/MultiStageQueryContextTest.java | 2 +- 5 files changed, 293 insertions(+), 116 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index d62bcce04ddc..b9e06a7b83d5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -2131,9 +2131,14 @@ private static Pair, List> makeDimensio // deprecation and removal in future if (MultiStageQueryContext.getArrayIngestMode(query.context()) == ArrayIngestMode.MVD) { log.warn( - "'%s' is set to 'mvd' in the query's context. This ingests the string arrays as multi-value " - + "strings instead of arrays, and is preserved for legacy reasons when MVDs were the only way to ingest string " - + "arrays in Druid. It is incorrect behaviour and will likely be removed in the future releases of Druid", + "%s[mvd] is active for this task. This causes string arrays (VARCHAR ARRAY in SQL) to be ingested as " + + "multi-value strings rather than true arrays. This behavior may change in a future version of Druid. To be " + + "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer " + + "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write " + + "out multi-value string dimensions using ARRAY_TO_MV. " + + "See https://druid.apache.org/docs/latest/querying/arrays" + + "#differences-between-arrays-and-multi-value-dimensions for more details.", + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, MultiStageQueryContext.CTX_ARRAY_INGEST_MODE ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 6f4f109ffa4b..0951e05e129b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -29,20 +29,29 @@ import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Pair; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.error.InvalidSqlInput; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.msq.querykit.QueryKitUtils; +import org.apache.druid.msq.util.ArrayIngestMode; +import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.sql.calcite.planner.DruidTypeSystem; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.run.NativeSqlEngine; @@ -50,7 +59,9 @@ import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.run.SqlEngines; import org.apache.druid.sql.destination.IngestDestination; +import org.apache.druid.sql.destination.TableDestination; +import javax.annotation.Nullable; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -162,7 +173,18 @@ public QueryMaker buildQueryMakerForInsert( final PlannerContext plannerContext ) { - validateInsert(relRoot.rel, relRoot.fields, plannerContext); + validateInsert( + relRoot.rel, + relRoot.fields, + destination instanceof TableDestination + ? plannerContext.getPlannerToolbox() + .rootSchema() + .getNamedSchema(plannerContext.getPlannerToolbox().druidSchemaName()) + .getSchema() + .getTable(((TableDestination) destination).getTableName()) + : null, + plannerContext + ); return new MSQTaskQueryMaker( destination, @@ -193,65 +215,78 @@ private static void validateSelect(final PlannerContext plannerContext) } } + /** + * Engine-specific validation that happens after the query is planned. + */ private static void validateInsert( final RelNode rootRel, final List> fieldMappings, + @Nullable Table targetTable, final PlannerContext plannerContext ) { + final int timeColumnIndex = getTimeColumnIndex(fieldMappings); + final Granularity segmentGranularity = getSegmentGranularity(plannerContext); + validateNoDuplicateAliases(fieldMappings); + validateTimeColumnType(rootRel, timeColumnIndex); + validateTimeColumnExistsIfNeeded(timeColumnIndex, segmentGranularity); + validateLimitAndOffset(rootRel, Granularities.ALL.equals(segmentGranularity)); + validateTypeChanges(rootRel, fieldMappings, targetTable, plannerContext); + } - // Find the __time field. - int timeFieldIndex = -1; + /** + * SQL allows multiple output columns with the same name. However, we don't allow this for INSERT or REPLACE + * queries, because we use these output names to generate columns in segments. They must be unique. + */ + private static void validateNoDuplicateAliases(final List> fieldMappings) + { + final Set aliasesSeen = new HashSet<>(); for (final Pair field : fieldMappings) { - if (field.right.equals(ColumnHolder.TIME_COLUMN_NAME)) { - timeFieldIndex = field.left; - - // Validate the __time field has the proper type. - final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName(); - if (timeType != SqlTypeName.TIMESTAMP) { - throw InvalidSqlInput.exception( - "Field [%s] was the wrong type [%s], expected TIMESTAMP", - ColumnHolder.TIME_COLUMN_NAME, - timeType - ); - } + if (!aliasesSeen.add(field.right)) { + throw InvalidSqlInput.exception("Duplicate field in SELECT: [%s]", field.right); } } + } - // Validate that if segmentGranularity is not ALL then there is also a __time field. - final Granularity segmentGranularity; - - try { - segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext( - plannerContext.getJsonMapper(), - plannerContext.queryContextMap() - ); + /** + * Validate the time field {@link ColumnHolder#TIME_COLUMN_NAME} has type TIMESTAMP. + * + * @param rootRel root rel + * @param timeColumnIndex index of the time field + */ + private static void validateTimeColumnType(final RelNode rootRel, final int timeColumnIndex) + { + if (timeColumnIndex < 0) { + return; } - catch (Exception e) { - // This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is - // populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching - // here - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.DEFENSIVE) - .build( - e, - "[%s] is not a valid value for [%s]", - plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY), - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY - ); + // Validate the __time field has the proper type. + final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(timeColumnIndex).getType().getSqlTypeName(); + if (timeType != SqlTypeName.TIMESTAMP) { + throw InvalidSqlInput.exception( + "Field[%s] was the wrong type[%s], expected TIMESTAMP", + ColumnHolder.TIME_COLUMN_NAME, + timeType + ); } + } + /** + * Validate that if segmentGranularity is not ALL, then there is also a {@link ColumnHolder#TIME_COLUMN_NAME} field. + * + * @param segmentGranularity granularity from {@link #getSegmentGranularity(PlannerContext)} + * @param timeColumnIndex index of the time field + */ + private static void validateTimeColumnExistsIfNeeded( + final int timeColumnIndex, + final Granularity segmentGranularity + ) + { final boolean hasSegmentGranularity = !Granularities.ALL.equals(segmentGranularity); - // Validate that the query does not have an inappropriate LIMIT or OFFSET. LIMIT prevents gathering result key - // statistics, which INSERT execution logic depends on. (In QueryKit, LIMIT disables statistics generation and - // funnels everything through a single partition.) - validateLimitAndOffset(rootRel, !hasSegmentGranularity); - - if (hasSegmentGranularity && timeFieldIndex < 0) { + if (hasSegmentGranularity && timeColumnIndex < 0) { throw InvalidInput.exception( "The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL. " + "Therefore, the query must specify a time column (named __time).", @@ -261,29 +296,24 @@ private static void validateInsert( } /** - * SQL allows multiple output columns with the same name. However, we don't allow this for INSERT or REPLACE - * queries, because we use these output names to generate columns in segments. They must be unique. + * Validate that the query does not have an inappropriate LIMIT or OFFSET. LIMIT prevents gathering result key + * statistics, which INSERT execution logic depends on. (In QueryKit, LIMIT disables statistics generation and + * funnels everything through a single partition.) + * + * LIMIT is allowed when segment granularity is ALL, disallowed otherwise. OFFSET is never allowed. + * + * @param rootRel root rel + * @param limitOk whether LIMIT is ok (OFFSET is never ok) */ - private static void validateNoDuplicateAliases(final List> fieldMappings) - { - final Set aliasesSeen = new HashSet<>(); - - for (final Pair field : fieldMappings) { - if (!aliasesSeen.add(field.right)) { - throw InvalidSqlInput.exception("Duplicate field in SELECT: [%s]", field.right); - } - } - } - - private static void validateLimitAndOffset(final RelNode topRel, final boolean limitOk) + private static void validateLimitAndOffset(final RelNode rootRel, final boolean limitOk) { Sort sort = null; - if (topRel instanceof Sort) { - sort = (Sort) topRel; - } else if (topRel instanceof Project) { + if (rootRel instanceof Sort) { + sort = (Sort) rootRel; + } else if (rootRel instanceof Project) { // Look for Project after a Sort, then validate the sort. - final Project project = (Project) topRel; + final Project project = (Project) rootRel; if (project.isMapping()) { final RelNode projectInput = project.getInput(); if (projectInput instanceof Sort) { @@ -307,6 +337,124 @@ private static void validateLimitAndOffset(final RelNode topRel, final boolean l } } + /** + * Validate that the query does not include any type changes from string to array or vice versa. + * + * These type changes tend to cause problems due to mixing of multi-value strings and string arrays. In particular, + * many queries written in the "classic MVD" style (treating MVDs as if they were regular strings) will fail when + * MVDs and arrays are mixed. So, we detect them as invalid. + * + * @param rootRel root rel + * @param fieldMappings field mappings from {@link #validateInsert(RelNode, List, Table, PlannerContext)} + * @param targetTable table we are inserting (or replacing) into, if any + * @param plannerContext planner context + */ + private static void validateTypeChanges( + final RelNode rootRel, + final List> fieldMappings, + @Nullable final Table targetTable, + final PlannerContext plannerContext + ) + { + if (targetTable == null) { + return; + } + + final Set columnsExcludedFromTypeVerification = + MultiStageQueryContext.getColumnsExcludedFromTypeVerification(plannerContext.queryContext()); + final ArrayIngestMode arrayIngestMode = MultiStageQueryContext.getArrayIngestMode(plannerContext.queryContext()); + + for (Pair fieldMapping : fieldMappings) { + final int columnIndex = fieldMapping.left; + final String columnName = fieldMapping.right; + final RelDataTypeField oldSqlTypeField = + targetTable.getRowType(DruidTypeSystem.TYPE_FACTORY).getField(columnName, true, false); + + if (!columnsExcludedFromTypeVerification.contains(columnName) && oldSqlTypeField != null) { + final ColumnType oldDruidType = Calcites.getColumnTypeForRelDataType(oldSqlTypeField.getType()); + final RelDataType newSqlType = rootRel.getRowType().getFieldList().get(columnIndex).getType(); + final ColumnType newDruidType = + DimensionSchemaUtils.getDimensionType(Calcites.getColumnTypeForRelDataType(newSqlType), arrayIngestMode); + + if (newDruidType.isArray() && oldDruidType.is(ValueType.STRING) + || (newDruidType.is(ValueType.STRING) && oldDruidType.isArray())) { + final StringBuilder messageBuilder = new StringBuilder( + StringUtils.format( + "Cannot write into field[%s] using type[%s] and arrayIngestMode[%s], since the existing type is[%s]", + columnName, + newSqlType, + StringUtils.toLowerCase(arrayIngestMode.toString()), + oldSqlTypeField.getType() + ) + ); + + if (newDruidType.is(ValueType.STRING) + && newSqlType.getSqlTypeName() == SqlTypeName.ARRAY + && arrayIngestMode == ArrayIngestMode.MVD) { + // Tried to insert an ARRAY, which got turned into a STRING by arrayIngestMode: mvd. + messageBuilder.append(". Try setting arrayIngestMode to[array] to retain the SQL type[") + .append(newSqlType) + .append("]"); + } + + if (newDruidType.is(ValueType.ARRAY) + && oldDruidType.is(ValueType.STRING) + && arrayIngestMode == ArrayIngestMode.ARRAY) { + // Tried to insert an ARRAY, which stayed an ARRAY, but wasn't compatible with existing STRING. + messageBuilder.append(". Try wrapping this field using ARRAY_TO_MV(...) AS ") + .append(CalciteSqlDialect.DEFAULT.quoteIdentifier(columnName)); + } + + throw InvalidSqlInput.exception(StringUtils.encodeForFormat(messageBuilder.toString())); + } + } + } + } + + /** + * Returns the index of {@link ColumnHolder#TIME_COLUMN_NAME} within a list of field mappings from + * {@link #validateInsert(RelNode, List, Table, PlannerContext)}. + * + * Returns -1 if the list does not contain a time column. + */ + private static int getTimeColumnIndex(final List> fieldMappings) + { + for (final Pair field : fieldMappings) { + if (field.right.equals(ColumnHolder.TIME_COLUMN_NAME)) { + return field.left; + } + } + + return -1; + } + + /** + * Retrieve the segment granularity for a query. + */ + private static Granularity getSegmentGranularity(final PlannerContext plannerContext) + { + try { + return QueryKitUtils.getSegmentGranularityFromContext( + plannerContext.getJsonMapper(), + plannerContext.queryContextMap() + ); + } + catch (Exception e) { + // This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is + // populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching + // here. + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build( + e, + "[%s] is not a valid value for [%s]", + plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY), + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY + ); + + } + } + private static RelDataType getMSQStructType(RelDataTypeFactory typeFactory) { return typeFactory.createStructType( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java index 748d411c97ce..38da3cd2e2d4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java @@ -78,55 +78,74 @@ public static DimensionSchema createDimensionSchema( } return new AutoTypeColumnSchema(column, null); } else { - // if schema information is not available, create a string dimension - if (type == null) { - return new StringDimensionSchema(column); - } else if (type.getType() == ValueType.STRING) { + final ColumnType dimensionType = getDimensionType(type, arrayIngestMode); + + if (dimensionType.getType() == ValueType.STRING) { return new StringDimensionSchema(column); - } else if (type.getType() == ValueType.LONG) { + } else if (dimensionType.getType() == ValueType.LONG) { return new LongDimensionSchema(column); - } else if (type.getType() == ValueType.FLOAT) { + } else if (dimensionType.getType() == ValueType.FLOAT) { return new FloatDimensionSchema(column); - } else if (type.getType() == ValueType.DOUBLE) { + } else if (dimensionType.getType() == ValueType.DOUBLE) { return new DoubleDimensionSchema(column); - } else if (type.getType() == ValueType.ARRAY) { - ValueType elementType = type.getElementType().getType(); - if (elementType == ValueType.STRING) { - if (arrayIngestMode == ArrayIngestMode.NONE) { - throw InvalidInput.exception( - "String arrays can not be ingested when '%s' is set to '%s'. Set '%s' in query context " - + "to 'array' to ingest the string array as an array, or ingest it as an MVD by explicitly casting the " - + "array to an MVD with ARRAY_TO_MV function.", - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, - StringUtils.toLowerCase(arrayIngestMode.name()), - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE - ); - } else if (arrayIngestMode == ArrayIngestMode.MVD) { - return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null); - } else { - // arrayIngestMode == ArrayIngestMode.ARRAY would be true - return new AutoTypeColumnSchema(column, type); - } - } else if (elementType.isNumeric()) { - // ValueType == LONG || ValueType == FLOAT || ValueType == DOUBLE - if (arrayIngestMode == ArrayIngestMode.ARRAY) { - return new AutoTypeColumnSchema(column, type); - } else { - throw InvalidInput.exception( - "Numeric arrays can only be ingested when '%s' is set to 'array' in the MSQ query's context. " - + "Current value of the parameter [%s]", - MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, - StringUtils.toLowerCase(arrayIngestMode.name()) - ); - } - } else { - throw new ISE("Cannot create dimension for type [%s]", type.toString()); - } + } else if (dimensionType.getType() == ValueType.ARRAY) { + return new AutoTypeColumnSchema(column, dimensionType); } else { - final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type); + final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(dimensionType); return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null) .getDimensionSchema(capabilities); } } } + + /** + * Based on a type from a query result, get the type of dimension we should write. + * + * @throws org.apache.druid.error.DruidException if there is some problem + */ + public static ColumnType getDimensionType( + @Nullable final ColumnType queryType, + final ArrayIngestMode arrayIngestMode + ) + { + if (queryType == null) { + // if schema information is not available, create a string dimension + return ColumnType.STRING; + } else if (queryType.getType() == ValueType.ARRAY) { + ValueType elementType = queryType.getElementType().getType(); + if (elementType == ValueType.STRING) { + if (arrayIngestMode == ArrayIngestMode.NONE) { + throw InvalidInput.exception( + "String arrays can not be ingested when '%s' is set to '%s'. Set '%s' in query context " + + "to 'array' to ingest the string array as an array, or ingest it as an MVD by explicitly casting the " + + "array to an MVD with the ARRAY_TO_MV function.", + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, + StringUtils.toLowerCase(arrayIngestMode.name()), + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE + ); + } else if (arrayIngestMode == ArrayIngestMode.MVD) { + return ColumnType.STRING; + } else { + assert arrayIngestMode == ArrayIngestMode.ARRAY; + return queryType; + } + } else if (elementType.isNumeric()) { + // ValueType == LONG || ValueType == FLOAT || ValueType == DOUBLE + if (arrayIngestMode == ArrayIngestMode.ARRAY) { + return queryType; + } else { + throw InvalidInput.exception( + "Numeric arrays can only be ingested when '%s' is set to 'array'. " + + "Current value of the parameter is[%s]", + MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, + StringUtils.toLowerCase(arrayIngestMode.name()) + ); + } + } else { + throw new ISE("Cannot create dimension for type[%s]", queryType.toString()); + } + } else { + return queryType; + } + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index b7340343c810..3cb49b7d05ea 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -43,7 +43,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -152,6 +154,7 @@ public class MultiStageQueryContext public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode"; public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = ArrayIngestMode.MVD; + public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification"; private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); @@ -297,7 +300,7 @@ public static int getRowsInMemory(final QueryContext queryContext) public static List getSortOrder(final QueryContext queryContext) { - return MultiStageQueryContext.decodeSortOrder(queryContext.getString(CTX_SORT_ORDER)); + return decodeList(CTX_SORT_ORDER, queryContext.getString(CTX_SORT_ORDER)); } @Nullable @@ -316,37 +319,39 @@ public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE); } + public static Set getColumnsExcludedFromTypeVerification(final QueryContext queryContext) + { + return new HashSet<>(decodeList(CTX_SKIP_TYPE_VERIFICATION, queryContext.getString(CTX_SKIP_TYPE_VERIFICATION))); + } + /** - * Decodes {@link #CTX_SORT_ORDER} from either a JSON or CSV string. + * Decodes a list from either a JSON or CSV string. */ - @Nullable @VisibleForTesting - static List decodeSortOrder(@Nullable final String sortOrderString) + static List decodeList(final String keyName, @Nullable final String listString) { - if (sortOrderString == null) { + if (listString == null) { return Collections.emptyList(); - } else if (LOOKS_LIKE_JSON_ARRAY.matcher(sortOrderString).matches()) { + } else if (LOOKS_LIKE_JSON_ARRAY.matcher(listString).matches()) { try { // Not caching this ObjectMapper in a static, because we expect to use it infrequently (once per INSERT // query that uses this feature) and there is no need to keep it around longer than that. - return new ObjectMapper().readValue(sortOrderString, new TypeReference>() - { - }); + return new ObjectMapper().readValue(listString, new TypeReference>() {}); } catch (JsonProcessingException e) { - throw QueryContexts.badValueException(CTX_SORT_ORDER, "CSV or JSON array", sortOrderString); + throw QueryContexts.badValueException(keyName, "CSV or JSON array", listString); } } else { final RFC4180Parser csvParser = new RFC4180ParserBuilder().withSeparator(',').build(); try { - return Arrays.stream(csvParser.parseLine(sortOrderString)) + return Arrays.stream(csvParser.parseLine(listString)) .filter(s -> s != null && !s.isEmpty()) .map(String::trim) .collect(Collectors.toList()); } catch (IOException e) { - throw QueryContexts.badValueException(CTX_SORT_ORDER, "CSV or JSON array", sortOrderString); + throw QueryContexts.badValueException(keyName, "CSV or JSON array", listString); } } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java index c2b401c70db5..9f24a8b4331d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java @@ -314,7 +314,7 @@ public void testUseConcurrentLocks() private static List decodeSortOrder(@Nullable final String input) { - return MultiStageQueryContext.decodeSortOrder(input); + return MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input); } private static IndexSpec decodeIndexSpec(@Nullable final Object inputSpecObject) From 083f73665d66a3735c2c90d6eeb5ea8528fedca9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 17 Feb 2024 09:27:09 -0800 Subject: [PATCH 2/8] Fixes. --- .../druid/msq/util/DimensionSchemaUtils.java | 31 ++++++++++++++----- .../apache/druid/msq/exec/MSQInsertTest.java | 2 +- .../msq/util/DimensionSchemaUtilsTest.java | 6 ++-- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java index 38da3cd2e2d4..07a518216168 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java @@ -57,9 +57,19 @@ public static DimensionSchema createDimensionSchemaForExtern(final String column ); } + /** + * Create a dimension schema for a dimension column, given the type that it was assigned in the query, and the + * current values of {@link MultiStageQueryContext#CTX_USE_AUTO_SCHEMAS} and + * {@link MultiStageQueryContext#CTX_ARRAY_INGEST_MODE}. + * + * @param column column name + * @param queryType type of the column from the query + * @param useAutoType active value of {@link MultiStageQueryContext#CTX_USE_AUTO_SCHEMAS} + * @param arrayIngestMode active value of {@link MultiStageQueryContext#CTX_ARRAY_INGEST_MODE} + */ public static DimensionSchema createDimensionSchema( final String column, - @Nullable final ColumnType type, + @Nullable final ColumnType queryType, boolean useAutoType, ArrayIngestMode arrayIngestMode ) @@ -67,21 +77,28 @@ public static DimensionSchema createDimensionSchema( if (useAutoType) { // for complex types that are not COMPLEX, we still want to use the handler since 'auto' typing // only works for the 'standard' built-in types - if (type != null && type.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(type)) { - final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type); + if (queryType != null && queryType.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(queryType)) { + final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(queryType); return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null) .getDimensionSchema(capabilities); } - if (type != null && (type.isPrimitive() || type.isPrimitiveArray())) { - return new AutoTypeColumnSchema(column, type); + if (queryType != null && (queryType.isPrimitive() || queryType.isPrimitiveArray())) { + return new AutoTypeColumnSchema(column, queryType); } return new AutoTypeColumnSchema(column, null); } else { - final ColumnType dimensionType = getDimensionType(type, arrayIngestMode); + // dimensionType may not be identical to queryType, depending on arrayIngestMode. + final ColumnType dimensionType = getDimensionType(queryType, arrayIngestMode); if (dimensionType.getType() == ValueType.STRING) { - return new StringDimensionSchema(column); + return new StringDimensionSchema( + column, + queryType != null && queryType.isArray() + ? DimensionSchema.MultiValueHandling.ARRAY + : DimensionSchema.MultiValueHandling.SORTED_ARRAY, + null + ); } else if (dimensionType.getType() == ValueType.LONG) { return new LongDimensionSchema(column); } else if (dimensionType.getType() == ValueType.FLOAT) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index ebee2e042a21..6ce1785672fb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -1215,7 +1215,7 @@ public void testInsertWrongTypeTimestamp() DruidException.Persona.USER, DruidException.Category.INVALID_INPUT, "invalidInput" - ).expectMessageIs("Field [__time] was the wrong type [VARCHAR], expected TIMESTAMP") + ).expectMessageIs("Field[__time] was the wrong type[VARCHAR], expected TIMESTAMP") ) .verifyPlanningErrors(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java index a82f5a35f9c0..0a4e3ddbd814 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/DimensionSchemaUtilsTest.java @@ -179,19 +179,19 @@ public void testSchemaMvdMode() DruidException.class, () -> DimensionSchemaUtils.createDimensionSchema("x", ColumnType.LONG_ARRAY, false, ArrayIngestMode.MVD) ); - Assert.assertEquals("Numeric arrays can only be ingested when 'arrayIngestMode' is set to 'array' in the MSQ query's context. Current value of the parameter [mvd]", t.getMessage()); + Assert.assertEquals("Numeric arrays can only be ingested when 'arrayIngestMode' is set to 'array'. Current value of the parameter is[mvd]", t.getMessage()); t = Assert.assertThrows( DruidException.class, () -> DimensionSchemaUtils.createDimensionSchema("x", ColumnType.DOUBLE_ARRAY, false, ArrayIngestMode.MVD) ); - Assert.assertEquals("Numeric arrays can only be ingested when 'arrayIngestMode' is set to 'array' in the MSQ query's context. Current value of the parameter [mvd]", t.getMessage()); + Assert.assertEquals("Numeric arrays can only be ingested when 'arrayIngestMode' is set to 'array'. Current value of the parameter is[mvd]", t.getMessage()); t = Assert.assertThrows( DruidException.class, () -> DimensionSchemaUtils.createDimensionSchema("x", ColumnType.FLOAT_ARRAY, false, ArrayIngestMode.MVD) ); - Assert.assertEquals("Numeric arrays can only be ingested when 'arrayIngestMode' is set to 'array' in the MSQ query's context. Current value of the parameter [mvd]", t.getMessage()); + Assert.assertEquals("Numeric arrays can only be ingested when 'arrayIngestMode' is set to 'array'. Current value of the parameter is[mvd]", t.getMessage()); } @Test From edf2843612bccb85b48712e94059d28e99d5bf81 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 12 Mar 2024 22:49:07 -0700 Subject: [PATCH 3/8] Tests and docs and error messages. --- docs/multi-stage-query/reference.md | 1 + docs/querying/arrays.md | 55 ++++- docs/querying/multi-value-dimensions.md | 4 +- .../apache/druid/msq/exec/ControllerImpl.java | 3 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 26 ++- .../apache/druid/msq/exec/MSQArraysTest.java | 161 ++++++++++++- .../druid/msq/test/CalciteMSQTestsHelper.java | 4 +- .../sql/calcite/CalciteArraysQueryTest.java | 211 +++--------------- .../druid/sql/calcite/util/CalciteTests.java | 1 + .../sql/calcite/util/TestDataBuilder.java | 34 +++ 10 files changed, 301 insertions(+), 199 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 45dfa464416f..26fada1fb3e1 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -364,6 +364,7 @@ The following table lists the context parameters for the MSQ task engine: | `waitUntilSegmentsLoad` | INSERT, REPLACE

If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` | | `includeSegmentSource` | SELECT, INSERT, REPLACE

Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. | `NONE` | | `rowsPerPage` | SELECT

The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.
This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 | +| `skipTypeVerification` | INSERT or REPLACE

During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.

Provide the column list as comma-separated values or as a JSON array in string form.| empty list | | `failOnEmptyInsert` | INSERT or REPLACE

When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` | ## Joins diff --git a/docs/querying/arrays.md b/docs/querying/arrays.md index dbeb3ec6e028..b35143beae13 100644 --- a/docs/querying/arrays.md +++ b/docs/querying/arrays.md @@ -71,9 +71,39 @@ The following shows an example `dimensionsSpec` for native ingestion of the data ### SQL-based ingestion -Arrays can also be inserted with [SQL-based ingestion](../multi-stage-query/index.md) when you include a query context parameter [`"arrayIngestMode":"array"`](../multi-stage-query/reference.md#context-parameters). +#### `arrayIngestMode` + +Arrays can be inserted with [SQL-based ingestion](../multi-stage-query/index.md) when you include the query context +parameter `arrayIngestMode: array`. + +When `arrayIngestMode` is `array`, SQL ARRAY types are stored using Druid array columns. + +When `arrayIngestMode` is `mvd`, SQL `VARCHAR ARRAY` are implicitly wrapped in [`ARRAY_TO_MV`](sql-functions.md#array-to-mv). +This causes them to be stored as [multi-value strings](multi-value-dimensions.md), using the same `STRING` column type +as regular scalar strings. SQL `BIGINT ARRAY` and `DOUBLE ARRAY` cannot be loaded under `arrayIngestMode: mvd`. + +The following table summarizes the differences in SQL ARRAY handling between `arrayIngestMode: array` and +`arrayIngestMode: mvd`. + +| SQL type | Stored type when `arrayIngestMode: array` | Stored type when `arrayIngestMode: mvd` (default) | +|---|---|---| +|`VARCHAR ARRAY`|`ARRAY`|[multi-value `STRING`](multi-value-dimensions.md)| +|`BIGINT ARRAY`|`ARRAY`|not possible (validation error)| +|`DOUBLE ARRAY`|`ARRAY`|not possible (validation error)| + +In either mode, you can explicitly wrap string arrays in `ARRAY_TO_MV` to cause them to be stored as +[multi-value strings](multi-value-dimensions.md). + +When validating a SQL INSERT or REPLACE statement that contains arrays, Druid checks whether the statement would lead +to mixing string arrays and multi-value strings in the same column. If this condition is detected, the statement fails +validation unless the column is named under the `skipTypeVerification` context parameter. This parameter can be either +a comma-separated list of column names, or a JSON array in string form. This validation is done to prevent accidentally +mixing arrays and multi-value strings in the same column. + +#### Examples + +Set [`arrayIngestMode: array`](#arrayIngestMode) in your query context to run the following examples. -For example, to insert the data used in this document: ```sql REPLACE INTO "array_example" OVERWRITE ALL WITH "ext" AS ( @@ -81,9 +111,14 @@ WITH "ext" AS ( FROM TABLE( EXTERN( '{"type":"inline","data":"{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row1\", \"arrayString\": [\"a\", \"b\"], \"arrayLong\":[1, null,3], \"arrayDouble\":[1.1, 2.2, null]}\n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row2\", \"arrayString\": [null, \"b\"], \"arrayLong\":null, \"arrayDouble\":[999, null, 5.5]}\n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row3\", \"arrayString\": [], \"arrayLong\":[1, 2, 3], \"arrayDouble\":[null, 2.2, 1.1]} \n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row4\", \"arrayString\": [\"a\", \"b\"], \"arrayLong\":[1, 2, 3], \"arrayDouble\":[]}\n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row5\", \"arrayString\": null, \"arrayLong\":[], \"arrayDouble\":null}"}', - '{"type":"json"}', - '[{"name":"timestamp", "type":"STRING"},{"name":"label", "type":"STRING"},{"name":"arrayString", "type":"ARRAY"},{"name":"arrayLong", "type":"ARRAY"},{"name":"arrayDouble", "type":"ARRAY"}]' + '{"type":"json"}' ) + ) EXTEND ( + "timestamp" VARCHAR, + "label" VARCHAR, + "arrayString" VARCHAR ARRAY, + "arrayLong" BIGINT ARRAY, + "arrayDouble" DOUBLE ARRAY ) ) SELECT @@ -96,8 +131,7 @@ FROM "ext" PARTITIONED BY DAY ``` -### SQL-based ingestion with rollup -These input arrays can also be grouped for rollup: +Arrays can also be used as `GROUP BY` keys for rollup: ```sql REPLACE INTO "array_example_rollup" OVERWRITE ALL @@ -106,9 +140,14 @@ WITH "ext" AS ( FROM TABLE( EXTERN( '{"type":"inline","data":"{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row1\", \"arrayString\": [\"a\", \"b\"], \"arrayLong\":[1, null,3], \"arrayDouble\":[1.1, 2.2, null]}\n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row2\", \"arrayString\": [null, \"b\"], \"arrayLong\":null, \"arrayDouble\":[999, null, 5.5]}\n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row3\", \"arrayString\": [], \"arrayLong\":[1, 2, 3], \"arrayDouble\":[null, 2.2, 1.1]} \n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row4\", \"arrayString\": [\"a\", \"b\"], \"arrayLong\":[1, 2, 3], \"arrayDouble\":[]}\n{\"timestamp\": \"2023-01-01T00:00:00\", \"label\": \"row5\", \"arrayString\": null, \"arrayLong\":[], \"arrayDouble\":null}"}', - '{"type":"json"}', - '[{"name":"timestamp", "type":"STRING"},{"name":"label", "type":"STRING"},{"name":"arrayString", "type":"ARRAY"},{"name":"arrayLong", "type":"ARRAY"},{"name":"arrayDouble", "type":"ARRAY"}]' + '{"type":"json"}' ) + ) EXTEND ( + "timestamp" VARCHAR, + "label" VARCHAR, + "arrayString" VARCHAR ARRAY, + "arrayLong" BIGINT ARRAY, + "arrayDouble" DOUBLE ARRAY ) ) SELECT diff --git a/docs/querying/multi-value-dimensions.md b/docs/querying/multi-value-dimensions.md index 2b33737a36fc..1ce3a618dac7 100644 --- a/docs/querying/multi-value-dimensions.md +++ b/docs/querying/multi-value-dimensions.md @@ -507,9 +507,9 @@ Avoid confusing string arrays with [multi-value dimensions](multi-value-dimensio Use care during ingestion to ensure you get the type you want. -To get arrays when performing an ingestion using JSON ingestion specs, such as [native batch](../ingestion/native-batch.md) or streaming ingestion such as with [Apache Kafka](../ingestion/kafka-ingestion.md), use dimension type `auto` or enable `useSchemaDiscovery`. When performing a [SQL-based ingestion](../multi-stage-query/index.md), write a query that generates arrays and set the context parameter `"arrayIngestMode": "array"`. Arrays may contain strings or numbers. +To get arrays when performing an ingestion using JSON ingestion specs, such as [native batch](../ingestion/native-batch.md) or streaming ingestion such as with [Apache Kafka](../ingestion/kafka-ingestion.md), use dimension type `auto` or enable `useSchemaDiscovery`. When performing a [SQL-based ingestion](../multi-stage-query/index.md), write a query that generates arrays and set the context parameter [`"arrayIngestMode": "array"`](arrays.md#arrayingestmode). Arrays may contain strings or numbers. -To get multi-value dimensions when performing an ingestion using JSON ingestion specs, use dimension type `string` and do not enable `useSchemaDiscovery`. When performing a [SQL-based ingestion](../multi-stage-query/index.md), wrap arrays in [`ARRAY_TO_MV`](multi-value-dimensions.md#sql-based-ingestion), which ensures you get multi-value dimensions in any `arrayIngestMode`. Multi-value dimensions can only contain strings. +To get multi-value dimensions when performing an ingestion using JSON ingestion specs, use dimension type `string` and do not enable `useSchemaDiscovery`. When performing a [SQL-based ingestion](../multi-stage-query/index.md), wrap arrays in [`ARRAY_TO_MV`](multi-value-dimensions.md#sql-based-ingestion), which ensures you get multi-value dimensions in any [`arrayIngestMode`](arrays.md#arrayingestmode). Multi-value dimensions can only contain strings. You can tell which type you have by checking the `INFORMATION_SCHEMA.COLUMNS` table, using a query like: diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b9e06a7b83d5..150397957d1f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -2136,8 +2136,7 @@ private static Pair, List> makeDimensio + "compatible with future behavior changes, we recommend setting %s to[array], which creates a clearer " + "separation between multi-value strings and true arrays. In either[mvd] or[array] mode, you can write " + "out multi-value string dimensions using ARRAY_TO_MV. " - + "See https://druid.apache.org/docs/latest/querying/arrays" - + "#differences-between-arrays-and-multi-value-dimensions for more details.", + + "See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode for more details.", MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, MultiStageQueryContext.CTX_ARRAY_INGEST_MODE ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 0951e05e129b..de5700c9ae51 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -391,20 +391,32 @@ private static void validateTypeChanges( if (newDruidType.is(ValueType.STRING) && newSqlType.getSqlTypeName() == SqlTypeName.ARRAY && arrayIngestMode == ArrayIngestMode.MVD) { - // Tried to insert an ARRAY, which got turned into a STRING by arrayIngestMode: mvd. + // Tried to insert a SQL ARRAY, which got turned into a STRING by arrayIngestMode: mvd. messageBuilder.append(". Try setting arrayIngestMode to[array] to retain the SQL type[") .append(newSqlType) .append("]"); - } - - if (newDruidType.is(ValueType.ARRAY) - && oldDruidType.is(ValueType.STRING) - && arrayIngestMode == ArrayIngestMode.ARRAY) { - // Tried to insert an ARRAY, which stayed an ARRAY, but wasn't compatible with existing STRING. + } else if (newDruidType.is(ValueType.ARRAY) + && oldDruidType.is(ValueType.STRING) + && arrayIngestMode == ArrayIngestMode.ARRAY) { + // Tried to insert a SQL ARRAY, which stayed an ARRAY, but wasn't compatible with existing STRING. messageBuilder.append(". Try wrapping this field using ARRAY_TO_MV(...) AS ") .append(CalciteSqlDialect.DEFAULT.quoteIdentifier(columnName)); + } else if (newDruidType.is(ValueType.STRING) && oldDruidType.is(ValueType.ARRAY)) { + // Tried to insert a SQL VARCHAR, but wasn't compatible with existing ARRAY. + messageBuilder.append(". Try"); + if (arrayIngestMode == ArrayIngestMode.MVD) { + messageBuilder.append(" setting arrayIngestMode to[array] and"); + } + messageBuilder.append(" adjusting your query to make this column an ARRAY instead of VARCHAR"); } + messageBuilder.append(". You can override this check by setting the context parameter '") + .append(MultiStageQueryContext.CTX_SKIP_TYPE_VERIFICATION) + .append("' to[") + .append(columnName) + .append("]. See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode " + + "for more details."); + throw InvalidSqlInput.exception(StringUtils.encodeForFormat(messageBuilder.toString())); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java index 456c74c29bc7..7096b840a236 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.systemfield.SystemFields; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.indexing.MSQSpec; @@ -150,6 +151,162 @@ public void testInsertStringArrayWithArrayIngestModeNone() .verifyExecutionError(); } + /** + * Tests the behaviour of INSERT query when arrayIngestMode is set to none (default) and the user tries to ingest + * string arrays + */ + @Test + public void testReplaceMvdWithStringArray() + { + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array"); + + testIngestQuery() + .setSql( + "REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT MV_TO_ARRAY(dim3) AS dim3 FROM foo\n" + + "PARTITIONED BY ALL TIME" + ) + .setQueryContext(adjustedContext) + .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(DruidException.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( + "Cannot write into field[dim3] using type[VARCHAR ARRAY] and arrayIngestMode[array], " + + "since the existing type is[VARCHAR]")) + )) + .verifyExecutionError(); + } + + /** + * Tests the behaviour of INSERT query when arrayIngestMode is set to none (default) and the user tries to ingest + * string arrays + */ + @Test + public void testReplaceStringArrayWithMvdInArrayMode() + { + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array"); + + testIngestQuery() + .setSql( + "REPLACE INTO arrays OVERWRITE ALL\n" + + "SELECT ARRAY_TO_MV(arrayString) AS arrayString FROM arrays\n" + + "PARTITIONED BY ALL TIME" + ) + .setQueryContext(adjustedContext) + .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(DruidException.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( + "Cannot write into field[arrayString] using type[VARCHAR] and arrayIngestMode[array], since the " + + "existing type is[VARCHAR ARRAY]. Try adjusting your query to make this column an ARRAY instead " + + "of VARCHAR.")) + )) + .verifyExecutionError(); + } + + /** + * Tests the behaviour of INSERT query when arrayIngestMode is set to none (default) and the user tries to ingest + * string arrays + */ + @Test + public void testReplaceStringArrayWithMvdInMvdMode() + { + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "mvd"); + + testIngestQuery() + .setSql( + "REPLACE INTO arrays OVERWRITE ALL\n" + + "SELECT ARRAY_TO_MV(arrayString) AS arrayString FROM arrays\n" + + "PARTITIONED BY ALL TIME" + ) + .setQueryContext(adjustedContext) + .setExpectedExecutionErrorMatcher(CoreMatchers.allOf( + CoreMatchers.instanceOf(DruidException.class), + ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith( + "Cannot write into field[arrayString] using type[VARCHAR] and arrayIngestMode[mvd], since the " + + "existing type is[VARCHAR ARRAY]. Try setting arrayIngestMode to[array] and adjusting your query to " + + "make this column an ARRAY instead of VARCHAR.")) + )) + .verifyExecutionError(); + } + + /** + * Tests the behaviour of INSERT query when arrayIngestMode is set to none (default) and the user tries to ingest + * string arrays + */ + @Test + public void testReplaceMvdWithStringArraySkipValidation() + { + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array"); + adjustedContext.put(MultiStageQueryContext.CTX_SKIP_TYPE_VERIFICATION, "dim3"); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim3", ColumnType.STRING_ARRAY) + .build(); + + testIngestQuery() + .setSql( + "REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT MV_TO_ARRAY(dim3) AS dim3 FROM foo\n" + + "PARTITIONED BY ALL TIME" + ) + .setQueryContext(adjustedContext) + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{0L, null}, + new Object[]{0L, null}, + new Object[]{0L, new Object[]{"a", "b"}}, + new Object[]{0L, new Object[]{""}}, + new Object[]{0L, new Object[]{"b", "c"}}, + new Object[]{0L, new Object[]{"d"}} + ) + ) + .verifyResults(); + } + + /** + * Tests the behaviour of INSERT query when arrayIngestMode is set to none (default) and the user tries to ingest + * string arrays + */ + @Test + public void testReplaceMvdWithMvd() + { + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array"); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim3", ColumnType.STRING) + .build(); + + testIngestQuery() + .setSql( + "REPLACE INTO foo OVERWRITE ALL\n" + + "SELECT dim3 FROM foo\n" + + "PARTITIONED BY ALL TIME" + ) + .setQueryContext(adjustedContext) + .setExpectedDataSource("foo") + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{0L, null}, + new Object[]{0L, null}, + new Object[]{0L, ""}, + new Object[]{0L, ImmutableList.of("a", "b")}, + new Object[]{0L, ImmutableList.of("b", "c")}, + new Object[]{0L, "d"} + ) + ) + .verifyResults(); + } /** * Tests the behaviour of INSERT query when arrayIngestMode is set to mvd (default) and the only array type to be @@ -475,7 +632,7 @@ public void testSelectOnArrays(String arrayIngestMode) null, Arrays.asList(3.3d, 4.4d, 5.5d), Arrays.asList(999.0d, null, 5.5d), - }, + }, new Object[]{ 1672531200000L, Arrays.asList("b", "c"), @@ -583,7 +740,7 @@ public void testSelectOnArrays(String arrayIngestMode) Arrays.asList(2L, 3L), null, Arrays.asList(null, 1.1d), - } + } ); RowSignature rowSignatureWithoutTimeColumn = diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 3146341faf94..bbacca1e54a2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -76,7 +76,6 @@ import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; -import org.apache.druid.sql.calcite.CalciteArraysQueryTest; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.timeline.DataSegment; @@ -92,6 +91,7 @@ import java.util.Set; import java.util.function.Supplier; +import static org.apache.druid.sql.calcite.util.CalciteTests.ARRAYS_DATASOURCE; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; @@ -282,7 +282,7 @@ private static Supplier> getSupplierForSegment(SegmentId .rows(ROWS_LOTS_OF_COLUMNS) .buildMMappedIndex(); break; - case CalciteArraysQueryTest.DATA_SOURCE_ARRAYS: + case ARRAYS_DATASOURCE: index = IndexBuilder.create() .tmpDir(temporaryFolder.newFolder()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index ce1f992a2ad2..3435cb8a1268 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -22,9 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.guice.NestedDataModule; import org.apache.druid.java.util.common.HumanReadableBytes; @@ -34,17 +32,13 @@ import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.ExpressionType; -import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; import org.apache.druid.query.FilteredDataSource; -import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.LookupDataSource; -import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; -import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -65,37 +59,20 @@ import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.NoopLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; -import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.TopNQueryBuilder; -import org.apache.druid.segment.FrameBasedInlineSegmentWrangler; -import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.InlineSegmentWrangler; -import org.apache.druid.segment.LookupSegmentWrangler; -import org.apache.druid.segment.MapSegmentWrangler; -import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinType; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.virtual.ExpressionVirtualColumn; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.apache.druid.server.QueryStackTests; -import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; -import org.apache.druid.sql.calcite.util.TestDataBuilder; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.LinearShardSpec; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -112,9 +89,6 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false) .build(); - - public static final String DATA_SOURCE_ARRAYS = "arrays"; - public static void assertResultsDeepEquals(String sql, List expected, List results) { for (int row = 0; row < results.size(); row++) { @@ -146,121 +120,6 @@ public void configureGuice(DruidInjectorBuilder builder) builder.addModule(new NestedDataModule()); } - @SuppressWarnings("resource") - @Override - public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( - final QueryRunnerFactoryConglomerate conglomerate, - final JoinableFactoryWrapper joinableFactory, - final Injector injector - ) throws IOException - { - NestedDataModule.registerHandlersAndSerde(); - - final QueryableIndex foo = IndexBuilder - .create() - .tmpDir(temporaryFolder.newFolder()) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(TestDataBuilder.INDEX_SCHEMA) - .rows(TestDataBuilder.ROWS1) - .buildMMappedIndex(); - - final QueryableIndex numfoo = IndexBuilder - .create() - .tmpDir(temporaryFolder.newFolder()) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS) - .rows(TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS) - .buildMMappedIndex(); - - final QueryableIndex indexLotsOfColumns = IndexBuilder - .create() - .tmpDir(temporaryFolder.newFolder()) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS) - .rows(TestDataBuilder.ROWS_LOTS_OF_COLUMNS) - .buildMMappedIndex(); - - final QueryableIndex indexArrays = - IndexBuilder.create() - .tmpDir(temporaryFolder.newFolder()) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) - .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withRollup(false) - .build() - ) - .inputSource( - ResourceInputSource.of( - NestedDataTestUtils.class.getClassLoader(), - NestedDataTestUtils.ARRAY_TYPES_DATA_FILE - ) - ) - .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) - .inputTmpDir(temporaryFolder.newFolder()) - .buildMMappedIndex(); - - SpecificSegmentsQuerySegmentWalker walker = SpecificSegmentsQuerySegmentWalker.createWalker( - injector, - conglomerate, - new MapSegmentWrangler( - ImmutableMap., SegmentWrangler>builder() - .put(InlineDataSource.class, new InlineSegmentWrangler()) - .put(FrameBasedInlineDataSource.class, new FrameBasedInlineSegmentWrangler()) - .put( - LookupDataSource.class, - new LookupSegmentWrangler(injector.getInstance(LookupExtractorFactoryContainerProvider.class)) - ) - .build() - ), - joinableFactory, - QueryStackTests.DEFAULT_NOOP_SCHEDULER - ); - walker.add( - DataSegment.builder() - .dataSource(CalciteTests.DATASOURCE1) - .interval(foo.getDataInterval()) - .version("1") - .shardSpec(new LinearShardSpec(0)) - .size(0) - .build(), - foo - ).add( - DataSegment.builder() - .dataSource(CalciteTests.DATASOURCE3) - .interval(numfoo.getDataInterval()) - .version("1") - .shardSpec(new LinearShardSpec(0)) - .size(0) - .build(), - numfoo - ).add( - DataSegment.builder() - .dataSource(CalciteTests.DATASOURCE5) - .interval(indexLotsOfColumns.getDataInterval()) - .version("1") - .shardSpec(new LinearShardSpec(0)) - .size(0) - .build(), - indexLotsOfColumns - ).add( - DataSegment.builder() - .dataSource(DATA_SOURCE_ARRAYS) - .version("1") - .interval(indexArrays.getDataInterval()) - .shardSpec(new LinearShardSpec(1)) - .size(0) - .build(), - indexArrays - ); - - return walker; - } - // test some query stuffs, sort of limited since no native array column types so either need to use constructor or // array aggregator @Test @@ -323,7 +182,7 @@ public void testGroupByArrayColumnFromCase() QUERY_CONTEXT_NO_STRINGIFY_ARRAY, ImmutableList.of( GroupByQuery.builder() - .setDataSource(DATA_SOURCE_ARRAYS) + .setDataSource(CalciteTests.ARRAYS_DATASOURCE) .setInterval(querySegmentSpec(Filtration.eternity())) .setVirtualColumns(expressionVirtualColumn( "v0", @@ -648,7 +507,7 @@ public void testSomeArrayFunctionsWithScanQueryArrayColumns() + " FROM druid.arrays", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .virtualColumns( // these report as strings even though they are not, someday this will not be so @@ -864,7 +723,7 @@ public void testArrayOverlapFilterStringArrayColumn() "SELECT arrayStringNulls FROM druid.arrays WHERE ARRAY_OVERLAP(arrayStringNulls, ARRAY['a','b']) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( or( @@ -895,7 +754,7 @@ public void testArrayOverlapFilterLongArrayColumn() "SELECT arrayLongNulls FROM druid.arrays WHERE ARRAY_OVERLAP(arrayLongNulls, ARRAY[1, 2]) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( or( @@ -926,7 +785,7 @@ public void testArrayOverlapFilterDoubleArrayColumn() "SELECT arrayDoubleNulls FROM druid.arrays WHERE ARRAY_OVERLAP(arrayDoubleNulls, ARRAY[1.1, 2.2]) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( or( @@ -1004,7 +863,7 @@ public void testArrayOverlapFilterArrayStringColumns() "SELECT arrayStringNulls, arrayString FROM druid.arrays WHERE ARRAY_OVERLAP(arrayStringNulls, arrayString) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters(expressionFilter("array_overlap(\"arrayStringNulls\",\"arrayString\")")) .columns("arrayString", "arrayStringNulls") @@ -1030,7 +889,7 @@ public void testArrayOverlapFilterArrayLongColumns() "SELECT arrayLongNulls, arrayLong FROM druid.arrays WHERE ARRAY_OVERLAP(arrayLongNulls, arrayLong) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters(expressionFilter("array_overlap(\"arrayLongNulls\",\"arrayLong\")")) .columns("arrayLong", "arrayLongNulls") @@ -1056,7 +915,7 @@ public void testArrayOverlapFilterArrayDoubleColumns() "SELECT arrayDoubleNulls, arrayDouble FROM druid.arrays WHERE ARRAY_OVERLAP(arrayDoubleNulls, arrayDouble) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters(expressionFilter("array_overlap(\"arrayDoubleNulls\",\"arrayDouble\")")) .columns("arrayDouble", "arrayDoubleNulls") @@ -1108,7 +967,7 @@ public void testArrayContainsFilterArrayStringColumn() "SELECT arrayStringNulls FROM druid.arrays WHERE ARRAY_CONTAINS(arrayStringNulls, ARRAY['a','b']) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( and( @@ -1138,7 +997,7 @@ public void testArrayContainsFilterArrayLongColumn() "SELECT arrayLongNulls FROM druid.arrays WHERE ARRAY_CONTAINS(arrayLongNulls, ARRAY[1, null]) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( and( @@ -1166,7 +1025,7 @@ public void testArrayContainsFilterArrayDoubleColumn() "SELECT arrayDoubleNulls FROM druid.arrays WHERE ARRAY_CONTAINS(arrayDoubleNulls, ARRAY[1.1, null]) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( and( @@ -1276,7 +1135,7 @@ public void testArrayContainsFilterArrayStringColumns() "SELECT arrayStringNulls, arrayString FROM druid.arrays WHERE ARRAY_CONTAINS(arrayStringNulls, arrayString) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( expressionFilter("array_contains(\"arrayStringNulls\",\"arrayString\")") @@ -1300,7 +1159,7 @@ public void testArrayContainsFilterArrayLongColumns() "SELECT arrayLong, arrayLongNulls FROM druid.arrays WHERE ARRAY_CONTAINS(arrayLong, arrayLongNulls) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( expressionFilter("array_contains(\"arrayLong\",\"arrayLongNulls\")") @@ -1327,7 +1186,7 @@ public void testArrayContainsFilterArrayDoubleColumns() "SELECT arrayDoubleNulls, arrayDouble FROM druid.arrays WHERE ARRAY_CONTAINS(arrayDoubleNulls, arrayDouble) LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .filters( expressionFilter("array_contains(\"arrayDoubleNulls\",\"arrayDouble\")") @@ -1378,7 +1237,7 @@ public void testArraySliceArrayColumns() QUERY_CONTEXT_NO_STRINGIFY_ARRAY, ImmutableList.of( new Druids.ScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .virtualColumns( expressionVirtualColumn("v0", "array_slice(\"arrayString\",1)", ColumnType.STRING_ARRAY), @@ -1463,7 +1322,7 @@ public void testArrayLengthArrayColumn() "SELECT arrayStringNulls, ARRAY_LENGTH(arrayStringNulls), SUM(cnt) FROM druid.arrays GROUP BY 1, 2 ORDER BY 2 DESC", ImmutableList.of( GroupByQuery.builder() - .setDataSource(DATA_SOURCE_ARRAYS) + .setDataSource(CalciteTests.ARRAYS_DATASOURCE) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setVirtualColumns(expressionVirtualColumn("v0", "array_length(\"arrayStringNulls\")", ColumnType.LONG)) @@ -1843,7 +1702,7 @@ public void testArrayGroupAsLongArrayColumn() QUERY_CONTEXT_NO_STRINGIFY_ARRAY, ImmutableList.of( GroupByQuery.builder() - .setDataSource(DATA_SOURCE_ARRAYS) + .setDataSource(CalciteTests.ARRAYS_DATASOURCE) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions( @@ -1940,7 +1799,7 @@ public void testArrayGroupAsDoubleArrayColumn() QUERY_CONTEXT_NO_STRINGIFY_ARRAY, ImmutableList.of( GroupByQuery.builder() - .setDataSource(DATA_SOURCE_ARRAYS) + .setDataSource(CalciteTests.ARRAYS_DATASOURCE) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions( @@ -2984,7 +2843,7 @@ public void testArrayAggArrayColumns() "SELECT ARRAY_AGG(arrayLongNulls), ARRAY_AGG(DISTINCT arrayDouble), ARRAY_AGG(DISTINCT arrayStringNulls) FILTER(WHERE arrayLong = ARRAY[2,3]) FROM arrays WHERE arrayDoubleNulls is not null", ImmutableList.of( Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .granularity(Granularities.ALL) .filters(notNull("arrayDoubleNulls")) @@ -3068,7 +2927,7 @@ public void testArrayConcatAggArrayColumns() "SELECT ARRAY_CONCAT_AGG(arrayLongNulls), ARRAY_CONCAT_AGG(DISTINCT arrayDouble), ARRAY_CONCAT_AGG(DISTINCT arrayStringNulls) FILTER(WHERE arrayLong = ARRAY[2,3]) FROM arrays WHERE arrayDoubleNulls is not null", ImmutableList.of( Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .granularity(Granularities.ALL) .filters(notNull("arrayDoubleNulls")) @@ -3926,7 +3785,7 @@ public void testUnnestArrayColumnsString() ImmutableList.of( Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayString\"", ColumnType.STRING_ARRAY), null )) @@ -3974,7 +3833,7 @@ public void testUnnestArrayColumnsStringNulls() ImmutableList.of( Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayStringNulls\"", ColumnType.STRING_ARRAY), null )) @@ -4021,7 +3880,7 @@ public void testUnnestArrayColumnsLong() ImmutableList.of( Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayLong\"", ColumnType.LONG_ARRAY), null )) @@ -4075,7 +3934,7 @@ public void testUnnestArrayColumnsLongNulls() ImmutableList.of( Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayLongNulls\"", ColumnType.LONG_ARRAY), null )) @@ -4125,7 +3984,7 @@ public void testUnnestArrayColumnsDouble() ImmutableList.of( Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayDouble\"", ColumnType.DOUBLE_ARRAY), null )) @@ -4179,7 +4038,7 @@ public void testUnnestArrayColumnsDoubleNulls() ImmutableList.of( Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayDoubleNulls\"", ColumnType.DOUBLE_ARRAY), null )) @@ -4315,7 +4174,7 @@ public void testUnnestTwiceArrayColumns() .dataSource( UnnestDataSource.create( UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn( "j0.unnest", "\"arrayStringNulls\"", @@ -4632,7 +4491,7 @@ public void testUnnestThriceWithFiltersOnDimAndAllUnnestColumnsArrayColumns() UnnestDataSource.create( FilteredDataSource.create( UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn( "j0.unnest", "\"arrayLongNulls\"", @@ -4782,7 +4641,7 @@ public void testUnnestThriceWithFiltersOnDimAndAllUnnestColumnsArrayColumnsOrFil UnnestDataSource.create( FilteredDataSource.create( UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn( "j0.unnest", "\"arrayLongNulls\"", @@ -4893,7 +4752,7 @@ public void testUnnestWithGroupByArrayColumn() ImmutableList.of( GroupByQuery.builder() .setDataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayStringNulls\"", ColumnType.STRING_ARRAY), null )) @@ -6497,7 +6356,7 @@ public void testUnnestWithSumOnUnnestedArrayColumn() ImmutableList.of( Druids.newTimeseriesQueryBuilder() .dataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayDoubleNulls\"", ColumnType.DOUBLE_ARRAY), null )) @@ -6584,7 +6443,7 @@ public void testUnnestWithGroupByWithWhereOnUnnestArrayCol() ImmutableList.of( GroupByQuery.builder() .setDataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayLongNulls\"", ColumnType.LONG_ARRAY), NullHandling.sqlCompatible() ? or( @@ -6621,7 +6480,7 @@ public void testUnnestWithGroupByHavingWithWhereOnUnnestArrayCol() ImmutableList.of( GroupByQuery.builder() .setDataSource(UnnestDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), expressionVirtualColumn("j0.unnest", "\"arrayLongNulls\"", ColumnType.LONG_ARRAY), NullHandling.sqlCompatible() ? or( @@ -6739,7 +6598,7 @@ public void testUnnestWithTimeFilterOnlyArrayColumn() Druids.newScanQueryBuilder() .dataSource(UnnestDataSource.create( FilteredDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), range("__time", ColumnType.LONG, 1672617600000L, 1672704600000L, false, false) ), expressionVirtualColumn("j0.unnest", "\"arrayStringNulls\"", ColumnType.STRING_ARRAY), @@ -6998,7 +6857,7 @@ public void testUnnestWithTimeFilterInsideSubqueryArrayColumns() .dataSource( UnnestDataSource.create( FilteredDataSource.create( - new TableDataSource(DATA_SOURCE_ARRAYS), + new TableDataSource(CalciteTests.ARRAYS_DATASOURCE), range("__time", ColumnType.LONG, 1672617600000L, 1672704600000L, false, false) ), expressionVirtualColumn("j0.unnest", "\"arrayLongNulls\"", ColumnType.LONG_ARRAY), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index f7364ad6be76..2594d2e98dce 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -114,6 +114,7 @@ public class CalciteTests public static final String DATASOURCE3 = "numfoo"; public static final String DATASOURCE4 = "foo4"; public static final String DATASOURCE5 = "lotsocolumns"; + public static final String ARRAYS_DATASOURCE = "arrays"; public static final String BROADCAST_DATASOURCE = "broadcast"; public static final String FORBIDDEN_DATASOURCE = "forbiddenDatasource"; public static final String FORBIDDEN_DESTINATION = "forbiddenDestination"; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java index 868317681485..2303d51e02ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java @@ -42,6 +42,7 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -832,6 +833,30 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( .rows(USER_VISIT_ROWS) .buildMMappedIndex(); + final QueryableIndex arraysIndex = IndexBuilder + .create() + .tmpDir(new File(tmpDir, "9")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) + .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) + .withMetrics( + new CountAggregatorFactory("cnt") + ) + .withRollup(false) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ARRAY_TYPES_DATA_FILE + ) + ) + .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) + .inputTmpDir(new File(tmpDir, "9-input")) + .buildMMappedIndex(); + return SpecificSegmentsQuerySegmentWalker.createWalker( injector, conglomerate, @@ -946,6 +971,15 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( .size(0) .build(), makeWikipediaIndexWithAggregation(tmpDir) + ).add( + DataSegment.builder() + .dataSource(CalciteTests.ARRAYS_DATASOURCE) + .version("1") + .interval(arraysIndex.getDataInterval()) + .shardSpec(new LinearShardSpec(1)) + .size(0) + .build(), + arraysIndex ); } From 09be4b6bbd2b87c3c4a9ea519689318d684c75c2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 12 Mar 2024 22:54:57 -0700 Subject: [PATCH 4/8] More docs. --- docs/multi-stage-query/reference.md | 2 +- docs/querying/arrays.md | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 26fada1fb3e1..0b10e14b50f9 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -351,7 +351,7 @@ The following table lists the context parameters for the MSQ task engine: | `maxNumTasks` | SELECT, INSERT, REPLACE

The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.

May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority. | 2 | | `taskAssignment` | SELECT, INSERT, REPLACE

Determines how many tasks to use. Possible values include:
  • `max`: Uses as many tasks as possible, up to `maxNumTasks`.
  • `auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 512 MiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When calculating the size of files, the weighted size is used, which considers the file format and compression format used if any. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.
| `max` | | `finalizeAggregations` | SELECT, INSERT, REPLACE

Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | `true` | -| `arrayIngestMode` | INSERT, REPLACE

Controls how ARRAY type values are stored in Druid segments. When set to `array` (recommended for SQL compliance), Druid will store all ARRAY typed values in [ARRAY typed columns](../querying/arrays.md), and supports storing both VARCHAR and numeric typed arrays. When set to `mvd` (the default, for backwards compatibility), Druid only supports VARCHAR typed arrays, and will store them as [multi-value string columns](../querying/multi-value-dimensions.md). When set to `none`, Druid will throw an exception when trying to store any type of arrays. `none` is most useful when set in the system default query context with (`druid.query.default.context.arrayIngestMode=none`) to be used to help migrate operators from `mvd` mode to `array` mode and force query writers to make an explicit choice between ARRAY and multi-value VARCHAR typed columns. | `mvd` (for backwards compatibility, recommended to use `array` for SQL compliance)| +| `arrayIngestMode` | INSERT, REPLACE

Controls how ARRAY type values are stored in Druid segments. When set to `array` (recommended for SQL compliance), Druid will store all ARRAY typed values in [ARRAY typed columns](../querying/arrays.md), and supports storing both VARCHAR and numeric typed arrays. When set to `mvd` (the default, for backwards compatibility), Druid only supports VARCHAR typed arrays, and will store them as [multi-value string columns](../querying/multi-value-dimensions.md). See [`arrayIngestMode`] in the [Arrays](../querying/arrays.md) page for more details. | `mvd` (for backwards compatibility, recommended to use `array` for SQL compliance)| | `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE

Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. This is a hint to the MSQ engine and the actual joins in the query may proceed in a different way than specified. See [Joins](#joins) for more details. | `broadcast` | | `rowsInMemory` | INSERT or REPLACE

Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 | | `segmentSortOrder` | INSERT or REPLACE

Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.

You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list | diff --git a/docs/querying/arrays.md b/docs/querying/arrays.md index b35143beae13..37fd8a99b11f 100644 --- a/docs/querying/arrays.md +++ b/docs/querying/arrays.md @@ -76,11 +76,18 @@ The following shows an example `dimensionsSpec` for native ingestion of the data Arrays can be inserted with [SQL-based ingestion](../multi-stage-query/index.md) when you include the query context parameter `arrayIngestMode: array`. -When `arrayIngestMode` is `array`, SQL ARRAY types are stored using Druid array columns. +When `arrayIngestMode` is `array`, SQL ARRAY types are stored using Druid array columns. This is recommended for new +tables. When `arrayIngestMode` is `mvd`, SQL `VARCHAR ARRAY` are implicitly wrapped in [`ARRAY_TO_MV`](sql-functions.md#array-to-mv). This causes them to be stored as [multi-value strings](multi-value-dimensions.md), using the same `STRING` column type -as regular scalar strings. SQL `BIGINT ARRAY` and `DOUBLE ARRAY` cannot be loaded under `arrayIngestMode: mvd`. +as regular scalar strings. SQL `BIGINT ARRAY` and `DOUBLE ARRAY` cannot be loaded under `arrayIngestMode: mvd`. This +is the default behavior when `arrayIngestMode` is not provided in your query context, although the default behavior +may change to `array` in a future release. + +When `arrayIngestMode` is `none`, Druid throws an exception when trying to store any type of arrays. This mode is most +useful when set in the system default query context with `druid.query.default.context.arrayIngestMode = none`, in cases +where the cluster administrator wants SQL query authors to explicitly provide one or the other in their query context. The following table summarizes the differences in SQL ARRAY handling between `arrayIngestMode: array` and `arrayIngestMode: mvd`. From 9f8f82751cd9478648addd2316e97103d2c58594 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 12 Mar 2024 23:10:02 -0700 Subject: [PATCH 5/8] Adjustments. --- docs/multi-stage-query/concepts.md | 4 ++-- docs/querying/arrays.md | 4 ++-- .../java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 6 +----- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/docs/multi-stage-query/concepts.md b/docs/multi-stage-query/concepts.md index 27b7d12c91c9..cae88a0f3750 100644 --- a/docs/multi-stage-query/concepts.md +++ b/docs/multi-stage-query/concepts.md @@ -200,8 +200,8 @@ To perform ingestion with rollup: 2. Set [`finalizeAggregations: false`](reference.md#context-parameters) in your context. This causes aggregation functions to write their internal state to the generated segments, instead of the finalized end result, and enables further aggregation at query time. -3. See [ARRAY types](../querying/arrays.md#sql-based-ingestion-with-rollup) for information about ingesting `ARRAY` columns -4. See [multi-value dimensions](../querying/multi-value-dimensions.md#sql-based-ingestion-with-rollup) for information to ingest multi-value VARCHAR columns +3. See [ARRAY types](../querying/arrays.md#sql-based-ingestion) for information about ingesting `ARRAY` columns +4. See [multi-value dimensions](../querying/multi-value-dimensions.md#sql-based-ingestion) for information to ingest multi-value VARCHAR columns When you do all of these things, Druid understands that you intend to do an ingestion with rollup, and it writes rollup-related metadata into the generated segments. Other applications can then use [`segmentMetadata` diff --git a/docs/querying/arrays.md b/docs/querying/arrays.md index 37fd8a99b11f..a7eebaa32afe 100644 --- a/docs/querying/arrays.md +++ b/docs/querying/arrays.md @@ -79,7 +79,7 @@ parameter `arrayIngestMode: array`. When `arrayIngestMode` is `array`, SQL ARRAY types are stored using Druid array columns. This is recommended for new tables. -When `arrayIngestMode` is `mvd`, SQL `VARCHAR ARRAY` are implicitly wrapped in [`ARRAY_TO_MV`](sql-functions.md#array-to-mv). +When `arrayIngestMode` is `mvd`, SQL `VARCHAR ARRAY` are implicitly wrapped in [`ARRAY_TO_MV`](sql-functions.md#array_to_mv). This causes them to be stored as [multi-value strings](multi-value-dimensions.md), using the same `STRING` column type as regular scalar strings. SQL `BIGINT ARRAY` and `DOUBLE ARRAY` cannot be loaded under `arrayIngestMode: mvd`. This is the default behavior when `arrayIngestMode` is not provided in your query context, although the default behavior @@ -109,7 +109,7 @@ mixing arrays and multi-value strings in the same column. #### Examples -Set [`arrayIngestMode: array`](#arrayIngestMode) in your query context to run the following examples. +Set [`arrayIngestMode: array`](#arrayingestmode) in your query context to run the following examples. ```sql REPLACE INTO "array_example" OVERWRITE ALL diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index de5700c9ae51..1a81d28b7c39 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -410,11 +410,7 @@ private static void validateTypeChanges( messageBuilder.append(" adjusting your query to make this column an ARRAY instead of VARCHAR"); } - messageBuilder.append(". You can override this check by setting the context parameter '") - .append(MultiStageQueryContext.CTX_SKIP_TYPE_VERIFICATION) - .append("' to[") - .append(columnName) - .append("]. See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode " + messageBuilder.append(". See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode " + "for more details."); throw InvalidSqlInput.exception(StringUtils.encodeForFormat(messageBuilder.toString())); From ef7e0817b55483c44861def185ca4bb2570fe51a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 12 Mar 2024 23:11:32 -0700 Subject: [PATCH 6/8] Adjust message. --- .../main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 1a81d28b7c39..8fbb52df2733 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -411,7 +411,7 @@ private static void validateTypeChanges( } messageBuilder.append(". See https://druid.apache.org/docs/latest/querying/arrays#arrayingestmode " - + "for more details."); + + "for more details about this check and how to override it if needed."); throw InvalidSqlInput.exception(StringUtils.encodeForFormat(messageBuilder.toString())); } From a7654126a8770b6e4ba28e2fd9d92b18ce5c2635 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 12 Mar 2024 23:20:30 -0700 Subject: [PATCH 7/8] Fix tests. --- .../druid/sql/avatica/DruidAvaticaHandlerTest.java | 12 ++++++++++++ .../apache/druid/sql/calcite/CalciteQueryTest.java | 2 ++ 2 files changed, 14 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index 193ba0c7f1b1..282c70d57727 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -524,6 +524,12 @@ public void testDatabaseMetaDataTables() throws SQLException final DatabaseMetaData metaData = client.getMetaData(); Assert.assertEquals( ImmutableList.of( + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.ARRAYS_DATASOURCE), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") + ), row( Pair.of("TABLE_CAT", "druid"), Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE), @@ -605,6 +611,12 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws SQLException final DatabaseMetaData metaData = superuserClient.getMetaData(); Assert.assertEquals( ImmutableList.of( + row( + Pair.of("TABLE_CAT", "druid"), + Pair.of("TABLE_NAME", CalciteTests.ARRAYS_DATASOURCE), + Pair.of("TABLE_SCHEM", "druid"), + Pair.of("TABLE_TYPE", "TABLE") + ), row( Pair.of("TABLE_CAT", "druid"), Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 8515fb8f9ac8..272f578be27d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -177,6 +177,7 @@ public void testInformationSchemaTables() + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() + .add(new Object[]{"druid", CalciteTests.ARRAYS_DATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) @@ -217,6 +218,7 @@ public void testInformationSchemaTables() CalciteTests.SUPER_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.builder() + .add(new Object[]{"druid", CalciteTests.ARRAYS_DATASOURCE, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) From 09d63d243ff1225f58dc285827ccfc41a07a530f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Mar 2024 09:56:14 -0700 Subject: [PATCH 8/8] Fix test in DV mode. --- .../apache/druid/msq/exec/MSQArraysTest.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java index 7096b840a236..cbf2d68a2850 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQArraysTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; @@ -112,14 +113,14 @@ public void setup() throws IOException dataFileNameJsonString = queryFramework().queryJsonMapper().writeValueAsString(dataFile); RowSignature dataFileSignature = RowSignature.builder() - .add("timestamp", ColumnType.STRING) - .add("arrayString", ColumnType.STRING_ARRAY) - .add("arrayStringNulls", ColumnType.STRING_ARRAY) - .add("arrayLong", ColumnType.LONG_ARRAY) - .add("arrayLongNulls", ColumnType.LONG_ARRAY) - .add("arrayDouble", ColumnType.DOUBLE_ARRAY) - .add("arrayDoubleNulls", ColumnType.DOUBLE_ARRAY) - .build(); + .add("timestamp", ColumnType.STRING) + .add("arrayString", ColumnType.STRING_ARRAY) + .add("arrayStringNulls", ColumnType.STRING_ARRAY) + .add("arrayLong", ColumnType.LONG_ARRAY) + .add("arrayLongNulls", ColumnType.LONG_ARRAY) + .add("arrayDouble", ColumnType.DOUBLE_ARRAY) + .add("arrayDoubleNulls", ColumnType.DOUBLE_ARRAY) + .build(); dataFileSignatureJsonString = queryFramework().queryJsonMapper().writeValueAsString(dataFileSignature); dataFileExternalDataSource = new ExternalDataSource( @@ -258,7 +259,8 @@ public void testReplaceMvdWithStringArraySkipValidation() .setExpectedRowSignature(rowSignature) .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", Intervals.ETERNITY, "test", 0))) .setExpectedResultRows( - ImmutableList.of( + NullHandling.sqlCompatible() + ? ImmutableList.of( new Object[]{0L, null}, new Object[]{0L, null}, new Object[]{0L, new Object[]{"a", "b"}}, @@ -266,6 +268,14 @@ public void testReplaceMvdWithStringArraySkipValidation() new Object[]{0L, new Object[]{"b", "c"}}, new Object[]{0L, new Object[]{"d"}} ) + : ImmutableList.of( + new Object[]{0L, null}, + new Object[]{0L, null}, + new Object[]{0L, null}, + new Object[]{0L, new Object[]{"a", "b"}}, + new Object[]{0L, new Object[]{"b", "c"}}, + new Object[]{0L, new Object[]{"d"}} + ) ) .verifyResults(); } @@ -299,7 +309,7 @@ public void testReplaceMvdWithMvd() ImmutableList.of( new Object[]{0L, null}, new Object[]{0L, null}, - new Object[]{0L, ""}, + new Object[]{0L, NullHandling.sqlCompatible() ? "" : null}, new Object[]{0L, ImmutableList.of("a", "b")}, new Object[]{0L, ImmutableList.of("b", "c")}, new Object[]{0L, "d"}