Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1900,12 +1900,19 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type
type,
query.context()
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumnName, type));
dimensions.add(
DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
type,
MultiStageQueryContext.useAutoColumnSchemas(query.context())
)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
} else {
Expand All @@ -1914,7 +1921,8 @@ private static Pair<List<DimensionSchema>, List<AggregatorFactory>> makeDimensio
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type
type,
query.context()
);
}
}
Expand All @@ -1940,13 +1948,20 @@ private static void populateDimensionsAndAggregators(
List<AggregatorFactory> aggregators,
Map<String, AggregatorFactory> outputColumnAggregatorFactories,
String outputColumn,
ColumnType type
ColumnType type,
QueryContext context
)
{
if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumn, type));
dimensions.add(
DimensionSchemaUtils.createDimensionSchema(
outputColumn,
type,
MultiStageQueryContext.useAutoColumnSchemas(context)
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
column ->
DimensionSchemaUtils.createDimensionSchema(
column,
signature.getColumnType(column).orElse(null)
signature.getColumnType(column).orElse(null),
false
)
).collect(Collectors.toList())
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;

import javax.annotation.Nullable;

Expand All @@ -38,33 +40,49 @@
*/
public class DimensionSchemaUtils
{
public static DimensionSchema createDimensionSchema(final String column, @Nullable final ColumnType type)
public static DimensionSchema createDimensionSchema(
final String column,
@Nullable final ColumnType type,
boolean useAutoType
)
{
// if schema information not available, create a string dimension
if (type == null) {
return new StringDimensionSchema(column);
}

switch (type.getType()) {
case STRING:
return new StringDimensionSchema(column);
case LONG:
return new LongDimensionSchema(column);
case FLOAT:
return new FloatDimensionSchema(column);
case DOUBLE:
return new DoubleDimensionSchema(column);
case ARRAY:
switch (type.getElementType().getType()) {
case STRING:
return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null);
default:
throw new ISE("Cannot create dimension for type [%s]", type.toString());
}
default:
if (useAutoType) {
// for complex types that are not COMPLEX<json>, we still want to use the handler since 'auto' typing
// only works for the 'standard' built-in typesg
if (type != null && type.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(type)) {
final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type);
return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null)
.getDimensionSchema(capabilities);
}

return new AutoTypeColumnSchema(column);
} else {
// if schema information not available, create a string dimension
if (type == null) {
return new StringDimensionSchema(column);
}

switch (type.getType()) {
case STRING:
return new StringDimensionSchema(column);
case LONG:
return new LongDimensionSchema(column);
case FLOAT:
return new FloatDimensionSchema(column);
case DOUBLE:
return new DoubleDimensionSchema(column);
case ARRAY:
switch (type.getElementType().getType()) {
case STRING:
return new StringDimensionSchema(column, DimensionSchema.MultiValueHandling.ARRAY, null);
default:
throw new ISE("Cannot create dimension for type [%s]", type.toString());
}
default:
final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type);
return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null)
.getDimensionSchema(capabilities);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
* <li><b>clusterStatisticsMergeMode</b>: Whether to use parallel or sequential mode for merging of the worker sketches.
* Can be <b>PARALLEL</b>, <b>SEQUENTIAL</b> or <b>AUTO</b>. See {@link ClusterStatisticsMergeMode} for more information on each mode.
* Default value is <b>PARALLEL</b></li>
*
* <li><b>useAutoColumnSchemas</b>: Temporary flag to allow experimentation using
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
* see {@link DimensionSchemaUtils#createDimensionSchema} for more details.
*
* </ol>
**/
public class MultiStageQueryContext
Expand Down Expand Up @@ -109,6 +114,8 @@ public class MultiStageQueryContext

public static final String CTX_INDEX_SPEC = "indexSpec";

public static final String CTX_USE_AUTO_SCHEMAS = "useAutoColumnSchemas";

private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);

public static String getMSQMode(final QueryContext queryContext)
Expand Down Expand Up @@ -213,6 +220,11 @@ public static IndexSpec getIndexSpec(final QueryContext queryContext, final Obje
return decodeIndexSpec(queryContext.get(CTX_INDEX_SPEC), objectMapper);
}

public static boolean useAutoColumnSchemas(final QueryContext queryContext)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the use of this externally undocumented feature in the class level docs. https://github.com/apache/druid/pull/14175/files#diff-cbcc2f2c9d571876786fe63eef35c2d82d64de0700334267c22f83f31d633af0L51

{
return queryContext.getBoolean(CTX_USE_AUTO_SCHEMAS, false);
}

/**
* Decodes {@link #CTX_SORT_ORDER} from either a JSON or CSV string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
Expand All @@ -34,13 +35,15 @@
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
Expand All @@ -51,9 +54,13 @@
import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -436,7 +443,7 @@ public void testInsertOnFoo1WithMultiValueDim()
}

@Test
public void testInsertOnFoo1WithLimitWithoutClusterBy()
public void testInsertOnFoo1MultiValueDimWithLimitWithoutClusterBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
Expand All @@ -453,7 +460,7 @@ public void testInsertOnFoo1WithLimitWithoutClusterBy()
}

@Test
public void testInsertOnFoo1WithLimitWithClusterBy()
public void testInsertOnFoo1MultiValueDimWithLimitWithClusterBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
Expand Down Expand Up @@ -519,6 +526,40 @@ public void testInsertOnFoo1WithMultiValueToArrayGroupBy()
.verifyResults();
}

@Test
public void testInsertOnFoo1WithAutoTypeArrayGroupBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim3", ColumnType.STRING_ARRAY).build();

final Map<String, Object> adjustedContext = new HashMap<>(context);
adjustedContext.put(MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS, true);

testIngestQuery().setSql(
"INSERT INTO foo1 SELECT MV_TO_ARRAY(dim3) as dim3 FROM foo GROUP BY 1 PARTITIONED BY ALL TIME")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(adjustedContext)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(
NullHandling.replaceWithDefault() ?
ImmutableList.of(
new Object[]{0L, new Object[]{null}},
new Object[]{0L, new Object[]{"a", "b"}},
new Object[]{0L, new Object[]{"b", "c"}},
new Object[]{0L, new Object[]{"d"}}
) : ImmutableList.of(
new Object[]{0L, new Object[]{null}},
new Object[]{0L, new Object[]{"a", "b"}},
new Object[]{0L, new Object[]{""}},
new Object[]{0L, new Object[]{"b", "c"}},
new Object[]{0L, new Object[]{"d"}}
)
)
.verifyResults();
}

@Test
public void testInsertOnFoo1WithMultiValueDimGroupByWithoutGroupByEnable()
{
Expand Down Expand Up @@ -1024,6 +1065,64 @@ public void testInsertOffsetThrowsException()
.verifyPlanningErrors();
}

@Test
public void testInsertArraysAutoType() throws IOException
{
List<Object[]> expectedRows = Arrays.asList(
new Object[]{1672531200000L, null, null, null},
new Object[]{1672531200000L, null, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672531200000L, new Object[]{"d", "e"}, new Object[]{1L, 4L}, new Object[]{2.2, 3.3, 4.0}},
new Object[]{1672531200000L, new Object[]{"a", "b"}, null, null},
new Object[]{1672531200000L, new Object[]{"a", "b"}, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672531200000L, new Object[]{"b", "c"}, new Object[]{1L, 2L, 3L, 4L}, new Object[]{1.1, 3.3}},
new Object[]{1672531200000L, new Object[]{"a", "b", "c"}, new Object[]{2L, 3L}, new Object[]{3.3, 4.4, 5.5}},
new Object[]{1672617600000L, null, null, null},
new Object[]{1672617600000L, null, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672617600000L, new Object[]{"d", "e"}, new Object[]{1L, 4L}, new Object[]{2.2, 3.3, 4.0}},
new Object[]{1672617600000L, new Object[]{"a", "b"}, null, null},
new Object[]{1672617600000L, new Object[]{"a", "b"}, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672617600000L, new Object[]{"b", "c"}, new Object[]{1L, 2L, 3L, 4L}, new Object[]{1.1, 3.3}},
new Object[]{1672617600000L, new Object[]{"a", "b", "c"}, new Object[]{2L, 3L}, new Object[]{3.3, 4.4, 5.5}}
);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("arrayString", ColumnType.STRING_ARRAY)
.add("arrayLong", ColumnType.LONG_ARRAY)
.add("arrayDouble", ColumnType.DOUBLE_ARRAY)
.build();

final Map<String, Object> adjustedContext = new HashMap<>(context);
adjustedContext.put(MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS, true);

final File tmpFile = temporaryFolder.newFile();
final InputStream resourceStream = NestedDataTestUtils.class.getClassLoader().getResourceAsStream(NestedDataTestUtils.ARRAY_TYPES_DATA_FILE);
final InputStream decompressing = CompressionUtils.decompress(resourceStream, NestedDataTestUtils.ARRAY_TYPES_DATA_FILE);
Files.copy(decompressing, tmpFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
decompressing.close();

final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(tmpFile);

testIngestQuery().setSql(" INSERT INTO foo1 SELECT\n"
+ " TIME_PARSE(\"timestamp\") as __time,\n"
+ " arrayString,\n"
+ " arrayLong,\n"
+ " arrayDouble\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadFileNameAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"arrayString\", \"type\": \"COMPLEX<json>\"}, {\"name\": \"arrayLong\", \"type\": \"COMPLEX<json>\"}, {\"name\": \"arrayDouble\", \"type\": \"COMPLEX<json>\"}]'\n"
+ " )\n"
+ ") PARTITIONED BY day")
.setQueryContext(adjustedContext)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();

}

@Nonnull
private List<Object[]> expectedFooRows()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.guice.annotations.EscalatedGlobal;
Expand Down Expand Up @@ -301,6 +302,8 @@ public void configure(Binder binder)
{
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
}

Expand Down Expand Up @@ -1129,7 +1132,7 @@ public void verifyResults()

log.info(
"Found rows which are sorted forcefully %s",
transformedOutputRows.stream().map(a -> Arrays.toString(a)).collect(Collectors.joining("\n"))
transformedOutputRows.stream().map(Arrays::deepToString).collect(Collectors.joining("\n"))
);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_ROWS_PER_SEGMENT;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_SORT_ORDER;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS;
import static org.apache.druid.msq.util.MultiStageQueryContext.DEFAULT_MAX_NUM_TASKS;

public class MultiStageQueryContextTest
Expand Down Expand Up @@ -258,6 +259,13 @@ public void getMSQModeParameterSetReturnsCorrectValue()
Assert.assertEquals("nonStrict", MultiStageQueryContext.getMSQMode(QueryContext.of(propertyMap)));
}

@Test
public void testUseAutoSchemas()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_USE_AUTO_SCHEMAS, true);
Assert.assertTrue(MultiStageQueryContext.useAutoColumnSchemas(QueryContext.of(propertyMap)));
}

private static List<String> decodeSortOrder(@Nullable final String input)
{
return MultiStageQueryContext.decodeSortOrder(input);
Expand Down