diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c65dfc549833..67df6965494d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1900,12 +1900,19 @@ private static Pair, List> makeDimensio aggregators, outputColumnAggregatorFactories, outputColumnName, - type + type, + query.context() ); } else { // complex columns only if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) { - dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumnName, type)); + dimensions.add( + DimensionSchemaUtils.createDimensionSchema( + outputColumnName, + type, + MultiStageQueryContext.useAutoColumnSchemas(query.context()) + ) + ); } else if (!isRollupQuery) { aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName())); } else { @@ -1914,7 +1921,8 @@ private static Pair, List> makeDimensio aggregators, outputColumnAggregatorFactories, outputColumnName, - type + type, + query.context() ); } } @@ -1940,13 +1948,20 @@ private static void populateDimensionsAndAggregators( List aggregators, Map outputColumnAggregatorFactories, String outputColumn, - ColumnType type + ColumnType type, + QueryContext context ) { if (outputColumnAggregatorFactories.containsKey(outputColumn)) { aggregators.add(outputColumnAggregatorFactories.get(outputColumn)); } else { - dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumn, type)); + dimensions.add( + DimensionSchemaUtils.createDimensionSchema( + outputColumn, + type, + MultiStageQueryContext.useAutoColumnSchemas(context) + ) + ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 8e7055284766..8c4d287e77d8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -126,7 +126,8 @@ private static Iterator inputSourceSegmentIterator( column -> DimensionSchemaUtils.createDimensionSchema( column, - signature.getColumnType(column).orElse(null) + signature.getColumnType(column).orElse(null), + false ) ).collect(Collectors.toList()) ), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java index 73e2ff048800..c7b20ac46e08 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/DimensionSchemaUtils.java @@ -25,10 +25,12 @@ import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.AutoTypeColumnSchema; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; @@ -38,33 +40,49 @@ */ public class DimensionSchemaUtils { - public static DimensionSchema createDimensionSchema(final String column, @Nullable final ColumnType type) + public static DimensionSchema createDimensionSchema( + final String column, + @Nullable final ColumnType type, + boolean useAutoType + ) { - // if schema information not available, create a string dimension - if (type == null) { - return new StringDimensionSchema(column); - } - - switch (type.getType()) { - case STRING: - return new StringDimensionSchema(column); - case LONG: - return new LongDimensionSchema(column); - case FLOAT: - return new FloatDimensionSchema(column); - case DOUBLE: - return new DoubleDimensionSchema(column); - case ARRAY: - switch (type.getElementType().getType()) { - case STRING: - return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null); - default: - throw new ISE("Cannot create dimension for type [%s]", type.toString()); - } - default: + if (useAutoType) { + // for complex types that are not COMPLEX, we still want to use the handler since 'auto' typing + // only works for the 'standard' built-in typesg + if (type != null && type.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(type)) { final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type); return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null) .getDimensionSchema(capabilities); + } + + return new AutoTypeColumnSchema(column); + } else { + // if schema information not available, create a string dimension + if (type == null) { + return new StringDimensionSchema(column); + } + + switch (type.getType()) { + case STRING: + return new StringDimensionSchema(column); + case LONG: + return new LongDimensionSchema(column); + case FLOAT: + return new FloatDimensionSchema(column); + case DOUBLE: + return new DoubleDimensionSchema(column); + case ARRAY: + switch (type.getElementType().getType()) { + case STRING: + return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null); + default: + throw new ISE("Cannot create dimension for type [%s]", type.toString()); + } + default: + final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type); + return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null) + .getDimensionSchema(capabilities); + } } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index d42b6eb83dc5..38bd1f6665b3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -63,6 +63,11 @@ *
  • clusterStatisticsMergeMode: Whether to use parallel or sequential mode for merging of the worker sketches. * Can be PARALLEL, SEQUENTIAL or AUTO. See {@link ClusterStatisticsMergeMode} for more information on each mode. * Default value is PARALLEL
  • + * + *
  • useAutoColumnSchemas: Temporary flag to allow experimentation using + * {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation, + * see {@link DimensionSchemaUtils#createDimensionSchema} for more details. + * * **/ public class MultiStageQueryContext @@ -109,6 +114,8 @@ public class MultiStageQueryContext public static final String CTX_INDEX_SPEC = "indexSpec"; + public static final String CTX_USE_AUTO_SCHEMAS = "useAutoColumnSchemas"; + private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); public static String getMSQMode(final QueryContext queryContext) @@ -213,6 +220,11 @@ public static IndexSpec getIndexSpec(final QueryContext queryContext, final Obje return decodeIndexSpec(queryContext.get(CTX_INDEX_SPEC), objectMapper); } + public static boolean useAutoColumnSchemas(final QueryContext queryContext) + { + return queryContext.getBoolean(CTX_USE_AUTO_SCHEMAS, false); + } + /** * Decodes {@link #CTX_SORT_ORDER} from either a JSON or CSV string. */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 33e30986bef9..62960657626b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -34,6 +35,7 @@ import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.segment.column.ColumnType; @@ -41,6 +43,7 @@ import org.apache.druid.segment.column.ValueType; import org.apache.druid.sql.SqlPlanningException; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CompressionUtils; import org.hamcrest.CoreMatchers; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -51,9 +54,13 @@ import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -436,7 +443,7 @@ public void testInsertOnFoo1WithMultiValueDim() } @Test - public void testInsertOnFoo1WithLimitWithoutClusterBy() + public void testInsertOnFoo1MultiValueDimWithLimitWithoutClusterBy() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -453,7 +460,7 @@ public void testInsertOnFoo1WithLimitWithoutClusterBy() } @Test - public void testInsertOnFoo1WithLimitWithClusterBy() + public void testInsertOnFoo1MultiValueDimWithLimitWithClusterBy() { RowSignature rowSignature = RowSignature.builder() .add("__time", ColumnType.LONG) @@ -519,6 +526,40 @@ public void testInsertOnFoo1WithMultiValueToArrayGroupBy() .verifyResults(); } + @Test + public void testInsertOnFoo1WithAutoTypeArrayGroupBy() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim3", ColumnType.STRING_ARRAY).build(); + + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS, true); + + testIngestQuery().setSql( + "INSERT INTO foo1 SELECT MV_TO_ARRAY(dim3) as dim3 FROM foo GROUP BY 1 PARTITIONED BY ALL TIME") + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setQueryContext(adjustedContext) + .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0))) + .setExpectedResultRows( + NullHandling.replaceWithDefault() ? + ImmutableList.of( + new Object[]{0L, new Object[]{null}}, + new Object[]{0L, new Object[]{"a", "b"}}, + new Object[]{0L, new Object[]{"b", "c"}}, + new Object[]{0L, new Object[]{"d"}} + ) : ImmutableList.of( + new Object[]{0L, new Object[]{null}}, + new Object[]{0L, new Object[]{"a", "b"}}, + new Object[]{0L, new Object[]{""}}, + new Object[]{0L, new Object[]{"b", "c"}}, + new Object[]{0L, new Object[]{"d"}} + ) + ) + .verifyResults(); + } + @Test public void testInsertOnFoo1WithMultiValueDimGroupByWithoutGroupByEnable() { @@ -1024,6 +1065,64 @@ public void testInsertOffsetThrowsException() .verifyPlanningErrors(); } + @Test + public void testInsertArraysAutoType() throws IOException + { + List expectedRows = Arrays.asList( + new Object[]{1672531200000L, null, null, null}, + new Object[]{1672531200000L, null, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}}, + new Object[]{1672531200000L, new Object[]{"d", "e"}, new Object[]{1L, 4L}, new Object[]{2.2, 3.3, 4.0}}, + new Object[]{1672531200000L, new Object[]{"a", "b"}, null, null}, + new Object[]{1672531200000L, new Object[]{"a", "b"}, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}}, + new Object[]{1672531200000L, new Object[]{"b", "c"}, new Object[]{1L, 2L, 3L, 4L}, new Object[]{1.1, 3.3}}, + new Object[]{1672531200000L, new Object[]{"a", "b", "c"}, new Object[]{2L, 3L}, new Object[]{3.3, 4.4, 5.5}}, + new Object[]{1672617600000L, null, null, null}, + new Object[]{1672617600000L, null, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}}, + new Object[]{1672617600000L, new Object[]{"d", "e"}, new Object[]{1L, 4L}, new Object[]{2.2, 3.3, 4.0}}, + new Object[]{1672617600000L, new Object[]{"a", "b"}, null, null}, + new Object[]{1672617600000L, new Object[]{"a", "b"}, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}}, + new Object[]{1672617600000L, new Object[]{"b", "c"}, new Object[]{1L, 2L, 3L, 4L}, new Object[]{1.1, 3.3}}, + new Object[]{1672617600000L, new Object[]{"a", "b", "c"}, new Object[]{2L, 3L}, new Object[]{3.3, 4.4, 5.5}} + ); + + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("arrayString", ColumnType.STRING_ARRAY) + .add("arrayLong", ColumnType.LONG_ARRAY) + .add("arrayDouble", ColumnType.DOUBLE_ARRAY) + .build(); + + final Map adjustedContext = new HashMap<>(context); + adjustedContext.put(MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS, true); + + final File tmpFile = temporaryFolder.newFile(); + final InputStream resourceStream = NestedDataTestUtils.class.getClassLoader().getResourceAsStream(NestedDataTestUtils.ARRAY_TYPES_DATA_FILE); + final InputStream decompressing = CompressionUtils.decompress(resourceStream, NestedDataTestUtils.ARRAY_TYPES_DATA_FILE); + Files.copy(decompressing, tmpFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + decompressing.close(); + + final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(tmpFile); + + testIngestQuery().setSql(" INSERT INTO foo1 SELECT\n" + + " TIME_PARSE(\"timestamp\") as __time,\n" + + " arrayString,\n" + + " arrayLong,\n" + + " arrayDouble\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + toReadFileNameAsJson + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"arrayString\", \"type\": \"COMPLEX\"}, {\"name\": \"arrayLong\", \"type\": \"COMPLEX\"}, {\"name\": \"arrayDouble\", \"type\": \"COMPLEX\"}]'\n" + + " )\n" + + ") PARTITIONED BY day") + .setQueryContext(adjustedContext) + .setExpectedResultRows(expectedRows) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .verifyResults(); + + } + @Nonnull private List expectedFooRows() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 493be17284f4..d768c73d80d2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -50,6 +50,7 @@ import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.guice.JoinableFactoryModule; import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.guice.SegmentWranglerModule; import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.guice.annotations.EscalatedGlobal; @@ -301,6 +302,8 @@ public void configure(Binder binder) { // We want this module to bring InputSourceModule along for the ride. binder.install(new InputSourceModule()); + binder.install(new NestedDataModule()); + NestedDataModule.registerHandlersAndSerde(); SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class); } @@ -1129,7 +1132,7 @@ public void verifyResults() log.info( "Found rows which are sorted forcefully %s", - transformedOutputRows.stream().map(a -> Arrays.toString(a)).collect(Collectors.joining("\n")) + transformedOutputRows.stream().map(Arrays::deepToString).collect(Collectors.joining("\n")) ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java index c118a40e89aa..0439b66f17de 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java @@ -48,6 +48,7 @@ import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_ROWS_PER_SEGMENT; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_SORT_ORDER; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY; +import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS; import static org.apache.druid.msq.util.MultiStageQueryContext.DEFAULT_MAX_NUM_TASKS; public class MultiStageQueryContextTest @@ -258,6 +259,13 @@ public void getMSQModeParameterSetReturnsCorrectValue() Assert.assertEquals("nonStrict", MultiStageQueryContext.getMSQMode(QueryContext.of(propertyMap))); } + @Test + public void testUseAutoSchemas() + { + Map propertyMap = ImmutableMap.of(CTX_USE_AUTO_SCHEMAS, true); + Assert.assertTrue(MultiStageQueryContext.useAutoColumnSchemas(QueryContext.of(propertyMap))); + } + private static List decodeSortOrder(@Nullable final String input) { return MultiStageQueryContext.decodeSortOrder(input);