diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 4539232b0f16..f9ab426d13c2 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -456,7 +456,7 @@ protected void reduce(final BytesWritable key, Iterable values, f dimOrder.addAll(index.getDimensionOrder()); log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); flushIndexToContextAndClose(key, index, context); - index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities()); + index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnHandlerCapabilities()); } index.add(value); @@ -752,7 +752,7 @@ public void doRun() combiningAggs, config, allDimensionNames, - persistIndex.getColumnCapabilities() + persistIndex.getColumnHandlerCapabilities() ); startTime = System.currentTimeMillis(); ++indexCount; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index c27f2ec7a8cd..da36ff618bde 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -1109,7 +1109,7 @@ private void processDimensionsSpec(final QueryableIndex index) dimension, createDimensionSchema( dimension, - columnHolder.getCapabilities(), + columnHolder.getHandlerCapabilities(), dimensionHandler.getMultivalueHandling() ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index c6b6d6fdf0a4..b3db2b71d1b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -252,7 +252,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException new ParallelIndexTuningConfig( null, null, - new OnheapIncrementalIndex.Spec(true), + new OnheapIncrementalIndex.Spec(true, false), 40000, 2000L, null, @@ -316,7 +316,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException ), new ClientCompactionTaskQueryTuningConfig( 100, - new OnheapIncrementalIndex.Spec(true), + new OnheapIncrementalIndex.Spec(true, false), 40000, 2000L, 30000L, diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java index 94b19e2743ce..6deadfbb131e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/any/StringAnyAggregatorFactory.java @@ -84,7 +84,6 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) @Override public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) { - ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); // null capabilities mean the column doesn't exist, so in vector engines the selector will never be multi-value if (capabilities != null && capabilities.hasMultipleValues().isMaybeTrue()) { diff --git a/processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java b/processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java index e8cc3ceac022..833f46428b7d 100644 --- a/processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java +++ b/processing/src/main/java/org/apache/druid/query/expression/NestedDataExpressions.java @@ -480,10 +480,10 @@ public Expr apply(List args) final StructuredDataProcessor processor = new StructuredDataProcessor() { @Override - public int processLiteralField(ArrayList fieldPath, Object fieldValue) + public StructuredDataProcessor.ProcessedLiteral processLiteralField(ArrayList fieldPath, Object fieldValue) { // do nothing, we only want the list of fields returned by this processor - return 0; + return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL; } }; diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java index 041d8566d35a..859396170b19 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java @@ -222,11 +222,15 @@ private ColumnAnalysis analyzeStringColumn( } else if (capabilities.isDictionaryEncoded().isTrue()) { // fallback if no bitmap index try (BaseColumn column = columnHolder.getColumn()) { - DictionaryEncodedColumn theColumn = (DictionaryEncodedColumn) column; - cardinality = theColumn.getCardinality(); - if (analyzingMinMax() && cardinality > 0) { - min = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(0)); - max = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(cardinality - 1)); + if (column instanceof DictionaryEncodedColumn) { + DictionaryEncodedColumn theColumn = (DictionaryEncodedColumn) column; + cardinality = theColumn.getCardinality(); + if (analyzingMinMax() && cardinality > 0) { + min = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(0)); + max = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(cardinality - 1)); + } + } else { + cardinality = 0; } } catch (IOException e) { diff --git a/processing/src/main/java/org/apache/druid/segment/DimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/DimensionIndexer.java index 3e379c37c212..21e637727343 100644 --- a/processing/src/main/java/org/apache/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/DimensionIndexer.java @@ -236,6 +236,12 @@ ColumnValueSelector makeColumnValueSelector( ); ColumnCapabilities getColumnCapabilities(); + + default ColumnCapabilities getHandlerCapabilities() + { + return getColumnCapabilities(); + } + /** * Compares the row values for this DimensionIndexer's dimension from a Row key. * @@ -301,6 +307,11 @@ boolean checkUnsortedEncodedKeyComponentsEqual( * is needed to be able to correctly map per-segment encoded values to global values on the next conversion step, * {@link DimensionMerger#convertSortedSegmentRowValuesToMergedRowValues}. The latter method requires sorted encoding * values on the input, because {@link DimensionMerger#writeMergedValueDictionary} takes sorted lookups as it's input. + * + * For columns which do not use the {@link DimensionMerger} to merge dictionary encoded values, this method should + * provide a selector which is compatible with the expectations of + * {@link DimensionMerger#processMergedRow(ColumnValueSelector)}, which might simply be to pass-through the 'unsorted' + * selector. */ ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index daa4696371f7..bae6d94fbbc6 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -45,6 +45,7 @@ import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnDescriptor; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.TypeSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.data.GenericIndexed; import org.apache.druid.segment.incremental.IncrementalIndex; @@ -262,10 +263,9 @@ private File makeIndexFiles( log.debug("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime); progress.progress(); - final Map metricsValueTypes = new TreeMap<>(Comparators.naturalNullsFirst()); - final Map metricTypeNames = new TreeMap<>(Comparators.naturalNullsFirst()); + final Map> metricTypes = new TreeMap<>(Comparators.naturalNullsFirst()); final List dimCapabilities = Lists.newArrayListWithCapacity(mergedDimensions.size()); - mergeCapabilities(adapters, mergedDimensions, metricsValueTypes, metricTypeNames, dimCapabilities); + mergeCapabilities(adapters, mergedDimensions, metricTypes, dimCapabilities); final Map handlers = makeDimensionHandlers(mergedDimensions, dimCapabilities); final List mergers = new ArrayList<>(); @@ -301,7 +301,7 @@ private File makeIndexFiles( closer.register(timeAndDimsIterator); final GenericColumnSerializer timeWriter = setupTimeWriter(segmentWriteOutMedium, indexSpec); final ArrayList metricWriters = - setupMetricsWriters(segmentWriteOutMedium, mergedMetrics, metricsValueTypes, metricTypeNames, indexSpec); + setupMetricsWriters(segmentWriteOutMedium, mergedMetrics, metricTypes, indexSpec); IndexMergeResult indexMergeResult = mergeIndexesAndWriteColumns( adapters, progress, @@ -320,8 +320,7 @@ private File makeIndexFiles( v9Smoosher, progress, mergedMetrics, - metricsValueTypes, - metricTypeNames, + metricTypes, metricWriters, indexSpec ); @@ -494,8 +493,7 @@ private void makeMetricsColumns( final FileSmoosher v9Smoosher, final ProgressIndicator progress, final List mergedMetrics, - final Map metricsValueTypes, - final Map metricTypeNames, + final Map> metricsTypes, final List metWriters, final IndexSpec indexSpec ) throws IOException @@ -510,8 +508,8 @@ private void makeMetricsColumns( GenericColumnSerializer writer = metWriters.get(i); final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - ValueType type = metricsValueTypes.get(metric); - switch (type) { + TypeSignature type = metricsTypes.get(metric); + switch (type.getType()) { case LONG: builder.setValueType(ValueType.LONG); builder.addSerde(createLongColumnPartSerde(writer, indexSpec)); @@ -525,7 +523,7 @@ private void makeMetricsColumns( builder.addSerde(createDoubleColumnPartSerde(writer, indexSpec)); break; case COMPLEX: - final String typeName = metricTypeNames.get(metric); + final String typeName = type.getComplexTypeName(); builder.setValueType(ValueType.COMPLEX); builder.addSerde( ComplexColumnPartSerde @@ -686,7 +684,7 @@ private IndexMergeResult mergeIndexesAndWriteColumns( } for (int dimIndex = 0; dimIndex < timeAndDims.getNumDimensions(); dimIndex++) { - DimensionMerger merger = mergers.get(dimIndex); + DimensionMergerV9 merger = mergers.get(dimIndex); if (merger.hasOnlyNulls()) { continue; } @@ -763,17 +761,16 @@ private GenericColumnSerializer setupTimeWriter( private ArrayList setupMetricsWriters( final SegmentWriteOutMedium segmentWriteOutMedium, final List mergedMetrics, - final Map metricsValueTypes, - final Map metricTypeNames, + final Map> metricsTypes, final IndexSpec indexSpec ) throws IOException { ArrayList metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size()); for (String metric : mergedMetrics) { - ValueType type = metricsValueTypes.get(metric); + TypeSignature type = metricsTypes.get(metric); GenericColumnSerializer writer; - switch (type) { + switch (type.getType()) { case LONG: writer = createLongColumnSerializer(segmentWriteOutMedium, metric, indexSpec); break; @@ -784,10 +781,9 @@ private ArrayList setupMetricsWriters( writer = createDoubleColumnSerializer(segmentWriteOutMedium, metric, indexSpec); break; case COMPLEX: - final String typeName = metricTypeNames.get(metric); - ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(type.getComplexTypeName()); if (serde == null) { - throw new ISE("Unknown type[%s]", typeName); + throw new ISE("Unknown type[%s]", type.getComplexTypeName()); } writer = serde.getSerializer(segmentWriteOutMedium, metric); break; @@ -897,8 +893,7 @@ private void writeDimValuesAndSetupDimConversion( private void mergeCapabilities( final List adapters, final List mergedDimensions, - final Map metricsValueTypes, - final Map metricTypeNames, + final Map> metricTypes, final List dimCapabilities ) { @@ -911,12 +906,11 @@ private void mergeCapabilities( ); } for (String metric : adapter.getMetricNames()) { - ColumnCapabilities capabilities = adapter.getCapabilities(metric); - capabilitiesMap.compute(metric, (m, existingCapabilities) -> + final ColumnCapabilities capabilities = adapter.getCapabilities(metric); + final ColumnCapabilities merged = capabilitiesMap.compute(metric, (m, existingCapabilities) -> mergeCapabilities(capabilities, existingCapabilities, METRIC_CAPABILITY_MERGE_LOGIC) ); - metricsValueTypes.put(metric, capabilities.getType()); - metricTypeNames.put(metric, adapter.getMetricType(metric)); + metricTypes.put(metric, merged); } } for (String dim : mergedDimensions) { diff --git a/processing/src/main/java/org/apache/druid/segment/IndexableAdapter.java b/processing/src/main/java/org/apache/druid/segment/IndexableAdapter.java index 5cccab281f42..96bc27be5812 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexableAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexableAdapter.java @@ -49,8 +49,6 @@ public interface IndexableAdapter BitmapValues getBitmapValues(String dimension, int dictId); - String getMetricType(String metric); - ColumnCapabilities getCapabilities(String column); Metadata getMetadata(); diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java index 6e4775e70372..df64897eb7a6 100644 --- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java @@ -59,7 +59,7 @@ public class NestedDataColumnIndexer implements DimensionIndexer fieldPath, Object fieldValue) + public ProcessedLiteral processLiteralField(ArrayList fieldPath, Object fieldValue) { final String fieldName = NestedPathFinder.toNormalizedJsonPath(fieldPath); LiteralFieldIndexer fieldIndexer = fieldIndexers.get(fieldName); @@ -142,6 +142,29 @@ public DimensionSelector makeDimensionSelector( IncrementalIndex.DimensionDesc desc ) { + final int dimIndex = desc.getIndex(); + final ColumnValueSelector rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex); + if (rootLiteralSelector != null) { + return new BaseSingleValueDimensionSelector() + { + @Nullable + @Override + protected String getValue() + { + final Object o = rootLiteralSelector.getObject(); + if (o == null) { + return null; + } + return o.toString(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } throw new UnsupportedOperationException("Not supported"); } @@ -152,6 +175,11 @@ public ColumnValueSelector makeColumnValueSelector( ) { final int dimIndex = desc.getIndex(); + final ColumnValueSelector rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex); + if (rootLiteralSelector != null) { + return rootLiteralSelector; + } + return new ObjectColumnSelector() { @Override @@ -164,7 +192,12 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public StructuredData getObject() { - return (StructuredData) currEntry.get().getDims()[dimIndex]; + final Object[] dims = currEntry.get().getDims(); + if (0 <= dimIndex && dimIndex < dims.length) { + return (StructuredData) dims[dimIndex]; + } else { + return null; + } } @Override @@ -177,6 +210,22 @@ public Class classOfObject() @Override public ColumnCapabilities getColumnCapabilities() + { + if (fieldIndexers.size() == 1 && fieldIndexers.containsKey(NestedPathFinder.JSON_PATH_ROOT)) { + LiteralFieldIndexer rootField = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (rootField.isSingleType()) { + return ColumnCapabilitiesImpl.createDefault() + .setType(rootField.getTypes().getSingleType()) + .setHasNulls(hasNulls); + } + } + return ColumnCapabilitiesImpl.createDefault() + .setType(NestedDataComplexTypeSerde.TYPE) + .setHasNulls(hasNulls); + } + + @Override + public ColumnCapabilities getHandlerCapabilities() { return ColumnCapabilitiesImpl.createDefault() .setType(NestedDataComplexTypeSerde.TYPE) @@ -216,6 +265,57 @@ public Object convertUnsortedEncodedKeyComponentToActualList(StructuredData key) @Override public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues) { + final LiteralFieldIndexer rootIndexer = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (fieldIndexers.size() == 1 && rootIndexer != null && rootIndexer.isSingleType()) { + // for root only literals, makeColumnValueSelector and makeDimensionSelector automatically unwrap StructuredData + // we need to do the opposite here, wrapping selector values with a StructuredData so that they are consistently + // typed for the merger + return new ColumnValueSelector() + { + @Override + public boolean isNull() + { + return selectorWithUnsortedValues.isNull(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + selectorWithUnsortedValues.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public StructuredData getObject() + { + return StructuredData.wrap(selectorWithUnsortedValues.getObject()); + } + + @Override + public float getFloat() + { + return selectorWithUnsortedValues.getFloat(); + } + + @Override + public double getDouble() + { + return selectorWithUnsortedValues.getDouble(); + } + + @Override + public long getLong() + { + return selectorWithUnsortedValues.getLong(); + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + }; + } return selectorWithUnsortedValues; } @@ -230,7 +330,6 @@ public void fillBitmapsFromUnsortedEncodedKeyComponent( throw new UnsupportedOperationException("Not supported"); } - public void mergeFields(SortedMap mergedFields) { for (Map.Entry entry : fieldIndexers.entrySet()) { @@ -246,6 +345,86 @@ public GlobalDictionarySortedCollector getSortedCollector() return globalDictionary.getSortedCollector(); } + @Nullable + private ColumnValueSelector getRootLiteralValueSelector( + IncrementalIndexRowHolder currEntry, + int dimIndex + ) + { + if (fieldIndexers.size() > 1) { + return null; + } + final LiteralFieldIndexer root = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (root == null || !root.isSingleType()) { + return null; + } + return new ColumnValueSelector() + { + @Override + public boolean isNull() + { + final Object o = getObject(); + return !(o instanceof Number); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).floatValue(); + } + + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).doubleValue(); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).longValue(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Nullable + @Override + public Object getObject() + { + final Object[] dims = currEntry.get().getDims(); + if (0 <= dimIndex && dimIndex < dims.length) { + final StructuredData data = (StructuredData) dims[dimIndex]; + if (data != null) { + return data.getValue(); + } + } + + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } static class LiteralFieldIndexer { @@ -258,7 +437,7 @@ static class LiteralFieldIndexer this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet(); } - private int processValue(@Nullable Object value) + private StructuredDataProcessor.ProcessedLiteral processValue(@Nullable Object value) { // null value is always added to the global dictionary as id 0, so we can ignore them here if (value != null) { @@ -270,25 +449,39 @@ private int processValue(@Nullable Object value) case LONG: globalDimensionDictionary.addLongValue(eval.asLong()); typeSet.add(ColumnType.LONG); - return StructuredDataProcessor.getLongObjectEstimateSize(); + return new StructuredDataProcessor.ProcessedLiteral<>( + eval.asLong(), + StructuredDataProcessor.getLongObjectEstimateSize() + ); case DOUBLE: globalDimensionDictionary.addDoubleValue(eval.asDouble()); typeSet.add(ColumnType.DOUBLE); - return StructuredDataProcessor.getDoubleObjectEstimateSize(); + return new StructuredDataProcessor.ProcessedLiteral<>( + eval.asDouble(), + StructuredDataProcessor.getDoubleObjectEstimateSize() + ); case STRING: default: final String asString = eval.asString(); globalDimensionDictionary.addStringValue(asString); typeSet.add(ColumnType.STRING); - return StructuredDataProcessor.estimateStringSize(asString); + return new StructuredDataProcessor.ProcessedLiteral<>( + eval.asString(), + StructuredDataProcessor.estimateStringSize(asString) + ); } } - return 0; + return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL; } public NestedLiteralTypeInfo.MutableTypeSet getTypes() { return typeSet; } + + public boolean isSingleType() + { + return typeSet.getSingleType() != null; + } } } diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java index f2599b668164..bf672c35221a 100644 --- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java @@ -21,7 +21,6 @@ import com.google.common.collect.PeekingIterator; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.column.BaseColumn; @@ -51,15 +50,22 @@ public class NestedDataColumnMerger implements DimensionMergerV9 { private static final Logger log = new Logger(NestedDataColumnMerger.class); - public static final Comparator>> LONG_MERGING_COMPARATOR = - DictionaryMergingIterator.makePeekingComparator(); - public static final Comparator>> DOUBLE_MERGING_COMPARATOR = - DictionaryMergingIterator.makePeekingComparator(); + + public static final Comparator> STRING_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + public static final Comparator> LONG_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + public static final Comparator> DOUBLE_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); private final String name; + private final IndexSpec indexSpec; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final ProgressIndicator progressIndicator; private final Closer closer; - private NestedDataColumnSerializer serializer; + private ColumnDescriptor.Builder descriptorBuilder; + private GenericColumnSerializer serializer; public NestedDataColumnMerger( String name, @@ -71,7 +77,9 @@ public NestedDataColumnMerger( { this.name = name; - this.serializer = new NestedDataColumnSerializer(name, indexSpec, segmentWriteOutMedium, progressIndicator, closer); + this.indexSpec = indexSpec; + this.segmentWriteOutMedium = segmentWriteOutMedium; + this.progressIndicator = progressIndicator; this.closer = closer; } @@ -112,34 +120,50 @@ public void writeMergedValueDictionary(List adapters) throws I } } - serializer.open(); - serializer.serializeFields(mergedFields); - int cardinality = 0; + descriptorBuilder = new ColumnDescriptor.Builder(); + + final NestedDataColumnSerializer defaultSerializer = new NestedDataColumnSerializer( + name, + indexSpec, + segmentWriteOutMedium, + progressIndicator, + closer + ); + serializer = defaultSerializer; + + final ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.serializerBuilder() + .withTypeName(NestedDataComplexTypeSerde.TYPE_NAME) + .withDelegate(serializer) + .build(); + descriptorBuilder.setValueType(ValueType.COMPLEX) + .setHasMultipleValues(false) + .addSerde(partSerde); + + defaultSerializer.open(); + defaultSerializer.serializeFields(mergedFields); + if (numMergeIndex > 1) { - DictionaryMergingIterator dictionaryMergeIterator = new DictionaryMergingIterator<>( + SimpleDictionaryMergingIterator dictionaryMergeIterator = new SimpleDictionaryMergingIterator<>( sortedLookups, - StringDimensionMergerV9.DICTIONARY_MERGING_COMPARATOR, - true + STRING_MERGING_COMPARATOR ); - DictionaryMergingIterator longDictionaryMergeIterator = new DictionaryMergingIterator<>( + SimpleDictionaryMergingIterator longDictionaryMergeIterator = new SimpleDictionaryMergingIterator<>( sortedLongLookups, - LONG_MERGING_COMPARATOR, - true + LONG_MERGING_COMPARATOR ); - DictionaryMergingIterator doubleDictionaryMergeIterator = new DictionaryMergingIterator<>( + SimpleDictionaryMergingIterator doubleDictionaryMergeIterator = new SimpleDictionaryMergingIterator<>( sortedDoubleLookups, - DOUBLE_MERGING_COMPARATOR, - true + DOUBLE_MERGING_COMPARATOR ); - serializer.serializeStringDictionary(() -> dictionaryMergeIterator); - serializer.serializeLongDictionary(() -> longDictionaryMergeIterator); - serializer.serializeDoubleDictionary(() -> doubleDictionaryMergeIterator); + defaultSerializer.serializeStringDictionary(() -> dictionaryMergeIterator); + defaultSerializer.serializeLongDictionary(() -> longDictionaryMergeIterator); + defaultSerializer.serializeDoubleDictionary(() -> doubleDictionaryMergeIterator); cardinality = dictionaryMergeIterator.getCardinality(); } else if (numMergeIndex == 1) { - serializer.serializeStringDictionary(sortedLookup.getSortedStrings()); - serializer.serializeLongDictionary(sortedLookup.getSortedLongs()); - serializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles()); + defaultSerializer.serializeStringDictionary(sortedLookup.getSortedStrings()); + defaultSerializer.serializeLongDictionary(sortedLookup.getSortedLongs()); + defaultSerializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles()); cardinality = sortedLookup.size(); } @@ -184,12 +208,12 @@ private GlobalDictionarySortedCollector getSortedIndexesFromQueryableAdapter( closer.register(col); if (col instanceof CompressedNestedDataComplexColumn) { - return getSortedIndexFromV1QueryableAdapter(mergedFields, col); + return getSortedIndexFromV1QueryableAdapterNestedColumn(mergedFields, col); } return null; } - private GlobalDictionarySortedCollector getSortedIndexFromV1QueryableAdapter( + private GlobalDictionarySortedCollector getSortedIndexFromV1QueryableAdapterNestedColumn( SortedMap mergedFields, BaseColumn col ) @@ -244,15 +268,7 @@ public boolean hasOnlyNulls() @Override public ColumnDescriptor makeColumnDescriptor() { - return new ColumnDescriptor.Builder() - .setValueType(ValueType.COMPLEX) - .setHasMultipleValues(false) - .addSerde(ComplexColumnPartSerde.serializerBuilder() - .withTypeName(NestedDataComplexTypeSerde.TYPE_NAME) - .withDelegate(serializer) - .build() - ) - .build(); + return descriptorBuilder.build(); } private boolean allNull(Indexed dimValues) diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java index 395fe2663625..8011f1e47fd7 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndex.java @@ -38,7 +38,7 @@ * @see QueryableIndexStorageAdapter for query path adapter * @see QueryableIndexIndexableAdapter for indexing path adapter */ -public interface QueryableIndex extends Closeable +public interface QueryableIndex extends Closeable, ColumnInspector { Interval getDataInterval(); int getNumRows(); @@ -52,6 +52,7 @@ public interface QueryableIndex extends Closeable @Nullable ColumnHolder getColumnHolder(String columnName); + @Override @Nullable default ColumnCapabilities getColumnCapabilities(String column) { diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java index d7482d14041d..30f37a5f9d42 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java @@ -27,8 +27,10 @@ import org.apache.druid.segment.column.DictionaryEncodedColumn; import org.apache.druid.segment.column.ValueTypes; import org.apache.druid.segment.data.ReadableOffset; +import org.apache.druid.segment.nested.NestedDataComplexColumn; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -122,6 +124,8 @@ private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensi if (column instanceof DictionaryEncodedColumn) { return ((DictionaryEncodedColumn) column).makeDimensionSelector(offset, extractionFn); + } else if (column instanceof NestedDataComplexColumn) { + return ((NestedDataComplexColumn) column).makeDimensionSelector(Collections.emptyList(), offset, extractionFn); } else { return DimensionSelector.constant(null, extractionFn); } diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java index bb3fb1b5e501..a3ec8f3b9cb5 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java @@ -21,14 +21,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.column.BaseColumn; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; -import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.column.DictionaryEncodedColumn; import org.apache.druid.segment.column.DictionaryEncodedValueIndex; import org.apache.druid.segment.data.BitmapValues; @@ -339,32 +337,10 @@ public void mark() } } - @Override - public String getMetricType(String metric) - { - final ColumnHolder columnHolder = input.getColumnHolder(metric); - - switch (columnHolder.getCapabilities().getType()) { - case FLOAT: - return "float"; - case LONG: - return "long"; - case DOUBLE: - return "double"; - case COMPLEX: { - try (ComplexColumn complexColumn = (ComplexColumn) columnHolder.getColumn()) { - return complexColumn.getTypeName(); - } - } - default: - throw new ISE("Unknown type[%s]", columnHolder.getCapabilities().asTypeString()); - } - } - @Override public ColumnCapabilities getCapabilities(String column) { - return input.getColumnHolder(column).getCapabilities(); + return input.getColumnHolder(column).getHandlerCapabilities(); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java index 64a8f56147d5..2a4fe4d5c72f 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedColumnSelectorFactory.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.IndexedInts; import org.apache.druid.segment.data.RangeIndexedInts; +import org.apache.druid.segment.nested.StructuredData; import javax.annotation.Nullable; import java.util.ArrayList; @@ -500,9 +501,13 @@ private Object getCurrentValue() @Nullable private Number getCurrentValueAsNumber() { + final Object currentValue = getCurrentValue(); + if (currentValue instanceof StructuredData) { + return Rows.objectToNumber(columnName, ((StructuredData) currentValue).getValue(), throwParseExceptions); + } return Rows.objectToNumber( columnName, - getCurrentValue(), + currentValue, throwParseExceptions ); } diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleDictionaryMergingIterator.java b/processing/src/main/java/org/apache/druid/segment/SimpleDictionaryMergingIterator.java new file mode 100644 index 000000000000..99c37f065857 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/SimpleDictionaryMergingIterator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.data.Indexed; + +import java.util.Comparator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; + +/** + * Use a {@link PriorityQueue} to merge sorted {@link Indexed} based value-lookups using {@link PeekingIterator} + */ +public class SimpleDictionaryMergingIterator> implements CloseableIterator +{ + public static > Comparator> makePeekingComparator() + { + return (lhs, rhs) -> { + T left = lhs.peek(); + T right = rhs.peek(); + if (left == null) { + //noinspection VariableNotUsedInsideIf + return right == null ? 0 : -1; + } else if (right == null) { + return 1; + } else { + return left.compareTo(right); + } + }; + } + + protected final PriorityQueue> pQueue; + protected int counter; + + public SimpleDictionaryMergingIterator( + Indexed[] dimValueLookups, + Comparator> comparator + ) + { + pQueue = new PriorityQueue<>(dimValueLookups.length, comparator); + + for (Indexed dimValueLookup : dimValueLookups) { + if (dimValueLookup == null) { + continue; + } + final PeekingIterator iter = Iterators.peekingIterator(dimValueLookup.iterator()); + if (iter.hasNext()) { + pQueue.add(iter); + } + } + } + + @Override + public boolean hasNext() + { + return !pQueue.isEmpty(); + } + + @Override + public T next() + { + PeekingIterator smallest = pQueue.remove(); + if (smallest == null) { + throw new NoSuchElementException(); + } + final T value = smallest.next(); + if (smallest.hasNext()) { + pQueue.add(smallest); + } + + while (!pQueue.isEmpty() && Objects.equals(value, pQueue.peek().peek())) { + pQueue.remove(); + } + counter++; + + return value; + } + + public int getCardinality() + { + return counter; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("remove"); + } + + @Override + public void close() + { + // nothing to do + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java index 261e1c46ddc1..ea009a85a667 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java @@ -88,7 +88,7 @@ private Map initDimensionHandlers(Indexed avai Map dimensionHandlerMap = Maps.newLinkedHashMap(); for (String dim : availableDimensions) { final ColumnHolder columnHolder = getColumnHolder(dim); - ColumnCapabilities capabilities = columnHolder.getCapabilities(); + ColumnCapabilities capabilities = columnHolder.getHandlerCapabilities(); DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); dimensionHandlerMap.put(dim, handler); } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java index 982aa94c572f..7176ad89debf 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnBuilder.java @@ -35,6 +35,9 @@ public class ColumnBuilder { private final ColumnCapabilitiesImpl capabilitiesBuilder = ColumnCapabilitiesImpl.createDefault(); + @Nullable + private ColumnCapabilities handlerCapabilities = null; + @Nullable private Supplier columnSupplier = null; @Nullable @@ -58,6 +61,12 @@ public SmooshedFileMapper getFileMapper() return this.fileMapper; } + public ColumnBuilder setType(ColumnType type) + { + this.capabilitiesBuilder.setType(type); + return this; + } + public ColumnBuilder setType(ValueType type) { this.capabilitiesBuilder.setType(ColumnTypeFactory.ofValueType(type)); @@ -138,11 +147,17 @@ public ColumnBuilder setHasNulls(ColumnCapabilities.Capable nullable) return this; } + public ColumnBuilder setHandlerCapabilities(ColumnCapabilities handlerCapabilities) + { + this.handlerCapabilities = handlerCapabilities; + return this; + } + public ColumnHolder build() { Preconditions.checkState(capabilitiesBuilder.getType() != null, "Type must be set."); - return new SimpleColumnHolder(capabilitiesBuilder, columnSupplier, indexSupplier); + return new SimpleColumnHolder(capabilitiesBuilder, handlerCapabilities, columnSupplier, indexSupplier); } private void checkColumnSupplierNotSet() diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java index 6e766d1e24a9..4ea27c495a32 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java @@ -35,20 +35,6 @@ */ public interface ColumnCapabilities extends TypeSignature { - /** - * Column type, good to know so caller can know what to expect and which optimal selector to use - */ - @Override - ValueType getType(); - - @Nullable - @Override - String getComplexTypeName(); - - @Nullable - @Override - TypeSignature getElementType(); - /** * Is the column dictionary encoded? If so, a DimensionDictionarySelector may be used instead of using a value * selector, allowing algorithms to operate on primitive integer dictionary ids rather than the looked up dictionary diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnHolder.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnHolder.java index a0109324b044..0a8cae83684e 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ColumnHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnHolder.java @@ -39,6 +39,11 @@ static boolean storeDoubleAsFloat() ColumnCapabilities getCapabilities(); + default ColumnCapabilities getHandlerCapabilities() + { + return getCapabilities(); + } + int getLength(); BaseColumn getColumn(); diff --git a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java index a0e4dd029534..b1649e6dbbef 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java +++ b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java @@ -239,7 +239,7 @@ public ColumnCapabilities getColumnCapabilities(String column) if (columnType.isNumeric()) { return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(columnType); } else if (columnType.is(ValueType.COMPLEX)) { - return new ColumnCapabilitiesImpl().setType(columnType).setHasMultipleValues(false); + return ColumnCapabilitiesImpl.createDefault().setType(columnType).setHasNulls(true); } else { return new ColumnCapabilitiesImpl().setType(columnType); } diff --git a/processing/src/main/java/org/apache/druid/segment/column/SimpleColumnHolder.java b/processing/src/main/java/org/apache/druid/segment/column/SimpleColumnHolder.java index a7a8082daf07..9de07005afb5 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/SimpleColumnHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/column/SimpleColumnHolder.java @@ -34,6 +34,8 @@ class SimpleColumnHolder implements ColumnHolder { private final ColumnCapabilities capabilities; + private final ColumnCapabilities handlerCapabilities; + @Nullable private final Supplier columnSupplier; @@ -45,6 +47,7 @@ class SimpleColumnHolder implements ColumnHolder SimpleColumnHolder( ColumnCapabilities capabilities, + @Nullable ColumnCapabilities handlerCapabilities, @Nullable Supplier columnSupplier, @Nullable ColumnIndexSupplier indexSupplier ) @@ -52,6 +55,8 @@ class SimpleColumnHolder implements ColumnHolder this.capabilities = capabilities; this.columnSupplier = columnSupplier; this.indexSupplier = indexSupplier; + this.handlerCapabilities = handlerCapabilities == null ? capabilities : handlerCapabilities; + // ColumnSupplier being null is sort of a rare case but can happen when a segment // was created, for example, using an aggregator that was removed in later versions. // In such cases we are not able to deserialize the column metadata and determine @@ -73,6 +78,12 @@ public ColumnCapabilities getCapabilities() return capabilities; } + @Override + public ColumnCapabilities getHandlerCapabilities() + { + return handlerCapabilities; + } + @Override public int getLength() { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java index 1269fe1e6b3c..887c931204e8 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java @@ -42,6 +42,8 @@ public abstract class AppendableIndexBuilder protected boolean preserveExistingMetrics = false; protected boolean useMaxMemoryEstimates = true; + protected boolean useNestedColumnIndexerForSchemaDiscovery = false; + protected final Logger log = new Logger(this.getClass()); public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) @@ -130,6 +132,14 @@ public AppendableIndexBuilder setUseMaxMemoryEstimates(final boolean useMaxMemor return this; } + public AppendableIndexBuilder setUseNestedColumnIndexerForSchemaDiscovery( + boolean useNestedColumnIndexerForSchemaDiscovery + ) + { + this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; + return this; + } + public void validate() { if (maxRowCount <= 0) { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java index 67cdabdf5673..3af6bfa4cbe8 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java @@ -32,4 +32,7 @@ public interface AppendableIndexSpec // Returns the default max bytes in memory for this index. long getDefaultMaxBytesInMemory(); + + @SuppressWarnings("unused") + boolean useNestedColumnIndexerForSchemaDiscovery(); } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 9bc1be2836fa..0085b58859e2 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -74,6 +74,7 @@ import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.serde.ComplexMetricExtractor; import org.apache.druid.segment.serde.ComplexMetricSerde; import org.apache.druid.segment.serde.ComplexMetrics; @@ -241,6 +242,8 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final AtomicLong bytesInMemory = new AtomicLong(); private final boolean useMaxMemoryEstimates; + private final boolean useNestedColumnIndexerForSchemaDiscovery; + // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); private final Supplier rowSupplier = in::get; @@ -273,7 +276,8 @@ protected IncrementalIndex( final boolean deserializeComplexMetrics, final boolean concurrentEventAdd, final boolean preserveExistingMetrics, - final boolean useMaxMemoryEstimates + final boolean useMaxMemoryEstimates, + final boolean useNestedColumnIndexerForSchemaDiscovery ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -285,6 +289,7 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.preserveExistingMetrics = preserveExistingMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; + this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; this.timeAndMetricsColumnCapabilities = new HashMap<>(); this.metricDescs = Maps.newLinkedHashMap(); @@ -454,13 +459,13 @@ public InputRow formatRow(InputRow row) return row; } - public Map getColumnCapabilities() + public Map getColumnHandlerCapabilities() { ImmutableMap.Builder builder = ImmutableMap.builder().putAll(timeAndMetricsColumnCapabilities); synchronized (dimensionDescs) { - dimensionDescs.forEach((dimension, desc) -> builder.put(dimension, desc.getCapabilities())); + dimensionDescs.forEach((dimension, desc) -> builder.put(dimension, desc.getIndexer().getHandlerCapabilities())); } return builder.build(); } @@ -473,11 +478,33 @@ public ColumnCapabilities getColumnCapabilities(String columnName) return timeAndMetricsColumnCapabilities.get(columnName); } synchronized (dimensionDescs) { - if (dimensionDescs.containsKey(columnName)) { - return dimensionDescs.get(columnName).getCapabilities(); + final DimensionDesc desc = dimensionDescs.get(columnName); + return desc != null ? desc.getCapabilities() : null; + } + } + + @Nullable + public ColumnCapabilities getColumnHandlerCapabilities(String columnName) + { + if (timeAndMetricsColumnCapabilities.containsKey(columnName)) { + final ColumnCapabilities capabilities = timeAndMetricsColumnCapabilities.get(columnName); + if (capabilities.is(ValueType.COMPLEX)) { + // normalize complex type name for these capabilities. the values in timeAndMetricsColumnCapabilities + // are direct from the AggregatorFactory, so might be too specific (think build vs merge aggs) + // for this method though, we want the 'normal' type name for the capabilities, since this is the true 'output' + // type of the column, so use the type from the MetricDesc instead, which is computed by round-tripping through + // something like ComplexMetrics.getSerdeForType(valueType.getComplexTypeName()).getTypeName() + return ColumnCapabilitiesImpl.copyOf(capabilities) + .setType(ColumnType.ofComplex(metricDescs.get(columnName).getType())); } + return capabilities; + } + + + synchronized (dimensionDescs) { + final DimensionDesc desc = dimensionDescs.get(columnName); + return desc != null ? desc.getIndexer().getHandlerCapabilities() : null; } - return null; } /** @@ -563,16 +590,22 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) absentDimensions.remove(dimension); } else { wasNewDim = true; - desc = addNewDimension( - dimension, - DimensionHandlerUtils.getHandlerFromCapabilities( - dimension, - // for schemaless type discovery, everything is a String. this should probably try to autodetect - // based on the value to use a better handler - makeDefaultCapabilitiesFromValueType(ColumnType.STRING), - null - ) - ); + final DimensionHandler handler; + if (useNestedColumnIndexerForSchemaDiscovery) { + handler = DimensionHandlerUtils.getHandlerFromCapabilities( + dimension, + makeDefaultCapabilitiesFromValueType(NestedDataComplexTypeSerde.TYPE), + null + ); + } else { + // legacy behavior: for schemaless type discovery, everything is a String + handler = DimensionHandlerUtils.getHandlerFromCapabilities( + dimension, + makeDefaultCapabilitiesFromValueType(ColumnType.STRING), + null + ); + } + desc = addNewDimension(dimension, handler); } DimensionIndexer indexer = desc.getIndexer(); Object dimsKey = null; @@ -784,13 +817,6 @@ public DimensionDesc getDimension(String dimension) } } - @Nullable - public String getMetricType(String metric) - { - final MetricDesc metricDesc = metricDescs.get(metric); - return metricDesc != null ? metricDesc.getType() : null; - } - public ColumnValueSelector makeMetricColumnValueSelector(String metric, IncrementalIndexRowHolder currEntry) { MetricDesc metricDesc = metricDescs.get(metric); @@ -857,7 +883,7 @@ private ColumnCapabilitiesImpl makeDefaultCapabilitiesFromValueType(ColumnType t .setDictionaryValuesUnique(true) .setDictionaryValuesSorted(false); case COMPLEX: - return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type).setHasNulls(true); + return ColumnCapabilitiesImpl.createDefault().setType(type).setHasNulls(true); default: return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(type); } @@ -1012,7 +1038,8 @@ public MetricDesc(int index, AggregatorFactory factory) capabilities = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(valueType); this.type = valueType.toString(); } else if (valueType.is(ValueType.COMPLEX)) { - capabilities = ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(valueType) + capabilities = ColumnCapabilitiesImpl.createDefault() + .setType(valueType) .setHasNulls(ColumnCapabilities.Capable.TRUE); ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(valueType.getComplexTypeName()); if (serde != null) { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java index a1c40f3fb74a..2799bfc3cd65 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java @@ -176,16 +176,10 @@ public BitmapValues getBitmapValues(String dimension, int index) return new MutableBitmapValues(bitmapIndex); } - @Override - public String getMetricType(String metric) - { - return index.getMetricType(metric); - } - @Override public ColumnCapabilities getCapabilities(String column) { - return index.getColumnCapabilities(column); + return index.getColumnHandlerCapabilities(column); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 9ff678ee10d6..4e5fd8000b4a 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -34,6 +34,7 @@ import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionIndexer; import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.NestedDataColumnIndexer; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; @@ -42,6 +43,7 @@ import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ListIndexed; import org.apache.druid.segment.filter.BooleanValueMatcher; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -207,6 +209,12 @@ public Comparable getMaxValue(String column) @Override public ColumnCapabilities getColumnCapabilities(String column) { + IncrementalIndex.DimensionDesc desc = index.getDimension(column); + // nested column indexer is a liar, and behaves like any type if it only processes unnested literals of a single type + // so keep it in the family so to speak + if (desc != null && desc.getIndexer() instanceof NestedDataColumnIndexer) { + return ColumnCapabilitiesImpl.createDefault().setType(NestedDataComplexTypeSerde.TYPE); + } // Different from index.getColumnCapabilities because, in a way, IncrementalIndex's string-typed dimensions // are always potentially multi-valued at query time. (Missing / null values for a row can potentially be // represented by an empty array; see StringDimensionIndexer.IndexerDimensionSelector's getRow method.) diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index f521c3913b02..1912136a0734 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -126,10 +126,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics // This is currently only use by auto compaction and should not be use for anything else. boolean preserveExistingMetrics, - boolean useMaxMemoryEstimates + boolean useMaxMemoryEstimates, + boolean useNestedColumnIndexerSchemaDiscovery ) { - super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates); + super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates, useNestedColumnIndexerSchemaDiscovery); this.maxRowCount = maxRowCount; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) @@ -656,7 +657,8 @@ protected OnheapIncrementalIndex buildInner() maxRowCount, maxBytesInMemory, preserveExistingMetrics, - useMaxMemoryEstimates + useMaxMemoryEstimates, + useNestedColumnIndexerForSchemaDiscovery ); } } @@ -664,6 +666,7 @@ protected OnheapIncrementalIndex buildInner() public static class Spec implements AppendableIndexSpec { private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false; + private static final boolean DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY = false; public static final String TYPE = "onheap"; // When set to true, for any row that already has metric (with the same name defined in metricSpec), @@ -673,17 +676,26 @@ public static class Spec implements AppendableIndexSpec // This is currently only use by auto compaction and should not be use for anything else. final boolean preserveExistingMetrics; + final boolean useNestedColumnIndexerForSchemaDiscovery; + public Spec() { this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS; + this.useNestedColumnIndexerForSchemaDiscovery = DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY; } @JsonCreator public Spec( - final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics + final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics, + final @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") @Nullable Boolean useNestedColumnIndexerForSchemaDiscovery ) { - this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : DEFAULT_PRESERVE_EXISTING_METRICS; + this.preserveExistingMetrics = preserveExistingMetrics != null + ? preserveExistingMetrics + : DEFAULT_PRESERVE_EXISTING_METRICS; + this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery != null + ? useNestedColumnIndexerForSchemaDiscovery + : DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY; } @JsonProperty @@ -695,7 +707,8 @@ public boolean isPreserveExistingMetrics() @Override public AppendableIndexBuilder builder() { - return new Builder().setPreserveExistingMetrics(preserveExistingMetrics); + return new Builder().setPreserveExistingMetrics(preserveExistingMetrics) + .setUseNestedColumnIndexerForSchemaDiscovery(useNestedColumnIndexerForSchemaDiscovery); } @Override @@ -707,6 +720,13 @@ public long getDefaultMaxBytesInMemory() return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6; } + @JsonProperty + @Override + public boolean useNestedColumnIndexerForSchemaDiscovery() + { + return useNestedColumnIndexerForSchemaDiscovery; + } + @Override public boolean equals(Object o) { @@ -717,13 +737,14 @@ public boolean equals(Object o) return false; } Spec spec = (Spec) o; - return preserveExistingMetrics == spec.preserveExistingMetrics; + return preserveExistingMetrics == spec.preserveExistingMetrics && + useNestedColumnIndexerForSchemaDiscovery == spec.useNestedColumnIndexerForSchemaDiscovery; } @Override public int hashCode() { - return Objects.hash(preserveExistingMetrics); + return Objects.hash(preserveExistingMetrics, useNestedColumnIndexerForSchemaDiscovery); } } } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index c5aed95c00ec..88b8a5dd615b 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; @@ -69,6 +70,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -94,6 +96,8 @@ public abstract class CompressedNestedDataComplexColumn> doubleDictionarySupplier; private final SmooshedFileMapper fileMapper; + private final String rootFieldPath; + private final ConcurrentHashMap columns = new ConcurrentHashMap<>(); private static final ObjectStrategy STRATEGY = NestedDataComplexTypeSerde.INSTANCE.getObjectStrategy(); @@ -108,7 +112,8 @@ public CompressedNestedDataComplexColumn( Supplier stringDictionary, Supplier> longDictionarySupplier, Supplier> doubleDictionarySupplier, - SmooshedFileMapper fileMapper + SmooshedFileMapper fileMapper, + String rootFieldPath ) { this.metadata = metadata; @@ -121,6 +126,7 @@ public CompressedNestedDataComplexColumn( this.fileMapper = fileMapper; this.closer = Closer.create(); this.compressedRawColumnSupplier = compressedRawColumnSupplier; + this.rootFieldPath = rootFieldPath; } public abstract List parsePath(String path); @@ -188,9 +194,16 @@ public Object getRowValue(int rowNum) @Override public ColumnValueSelector makeColumnValueSelector(ReadableOffset offset) { + if (fields.size() == 1 && rootFieldPath.equals(fields.get(0))) { + return makeColumnValueSelector( + ImmutableList.of(), + offset + ); + } if (compressedRawColumn == null) { compressedRawColumn = closer.register(compressedRawColumnSupplier.get()); } + return new ObjectColumnSelector() { @Nullable @@ -221,6 +234,12 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset offset) { + if (fields.size() == 1 && rootFieldPath.equals(fields.get(0))) { + return makeVectorObjectSelector( + Collections.emptyList(), + offset + ); + } if (compressedRawColumn == null) { compressedRawColumn = closer.register(compressedRawColumnSupplier.get()); } @@ -283,6 +302,17 @@ public int getMaxVectorSize() }; } + @Override + public VectorValueSelector makeVectorValueSelector(ReadableVectorOffset offset) + { + if (fields.size() == 1 && rootFieldPath.equals(fields.get(0))) { + return makeVectorValueSelector( + Collections.emptyList(), + offset + ); + } + return super.makeVectorValueSelector(offset); + } @Override public int getLength() diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java index 6da8472d2acd..35142bbc816c 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java @@ -81,7 +81,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer fieldPath, Object fieldValue) + public StructuredDataProcessor.ProcessedLiteral processLiteralField(ArrayList fieldPath, Object fieldValue) { final GlobalDictionaryEncodedFieldColumnWriter writer = fieldWriters.get( NestedPathFinder.toNormalizedJsonPath(fieldPath) @@ -91,13 +91,13 @@ public int processLiteralField(ArrayList fieldPath, Object field ExprEval eval = ExprEval.bestEffortOf(fieldValue); writer.addValue(rowCount, eval.value()); // serializer doesn't use size estimate - return 0; + return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL; } catch (IOException e) { throw new RuntimeException(":("); } } - return 0; + return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL; } }; @@ -272,7 +272,7 @@ public void serializeDoubleDictionary(Iterable dictionaryValues) throws @Override public void serialize(ColumnValueSelector selector) throws IOException { - StructuredData data = selector.getObject(); + StructuredData data = StructuredData.wrap(selector.getObject()); if (data == null) { nullRowsBitmap.add(rowCount); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java index 9c613e18538b..4e1ecbb994ed 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java @@ -40,6 +40,7 @@ import org.apache.druid.segment.data.FrontCodedIndexed; import org.apache.druid.segment.data.GenericIndexed; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; @@ -47,17 +48,20 @@ public class NestedDataColumnSupplier implements Supplier { private final byte version; private final NestedDataColumnMetadata metadata; - private final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier; - private final ImmutableBitmap nullValues; private final GenericIndexed fields; private final NestedLiteralTypeInfo fieldInfo; - private final GenericIndexed dictionary; - private final Supplier frontCodedDictionarySupplier; + private final CompressedVariableSizedBlobColumnSupplier compressedRawColumnSupplier; + private final ImmutableBitmap nullValues; + private final GenericIndexed stringDictionary; + private final Supplier frontCodedStringDictionarySupplier; private final Supplier> longDictionarySupplier; private final Supplier> doubleDictionarySupplier; private final ColumnConfig columnConfig; private final SmooshedFileMapper fileMapper; + @Nullable + private final ColumnType simpleType; + public NestedDataColumnSupplier( ByteBuffer bb, ColumnBuilder columnBuilder, @@ -91,6 +95,15 @@ public NestedDataColumnSupplier( fields = GenericIndexed.read(bb, GenericIndexed.STRING_STRATEGY, mapper); fieldInfo = NestedLiteralTypeInfo.read(bb, fields.size()); + if (fields.size() == 1 && + ((version == 0x03 && NestedPathFinder.JQ_PATH_ROOT.equals(fields.get(0))) || + (version == 0x04 && NestedPathFinder.JSON_PATH_ROOT.equals(fields.get(0)))) + ) { + simpleType = fieldInfo.getTypes(0).getSingleType(); + } else { + simpleType = null; + } + final ByteBuffer stringDictionaryBuffer = loadInternalFile( mapper, NestedDataColumnSerializer.STRING_DICTIONARY_FILE_NAME @@ -102,14 +115,17 @@ public NestedDataColumnSupplier( if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) { final byte encodingId = stringDictionaryBuffer.get(); if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) { - frontCodedDictionarySupplier = FrontCodedIndexed.read(stringDictionaryBuffer, metadata.getByteOrder()); - dictionary = null; + frontCodedStringDictionarySupplier = FrontCodedIndexed.read( + stringDictionaryBuffer, + metadata.getByteOrder() + ); + stringDictionary = null; } else if (encodingId == StringEncodingStrategy.UTF8_ID) { // this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but // this provides backwards compatibility should we switch at some point in the future to always // writing dictionaryVersion - dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper); - frontCodedDictionarySupplier = null; + stringDictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper); + frontCodedStringDictionarySupplier = null; } else { throw new ISE("impossible, unknown encoding strategy id: %s", encodingId); } @@ -118,8 +134,8 @@ public NestedDataColumnSupplier( // as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the // GenericIndexed version can be correctly read stringDictionaryBuffer.position(dictionaryStartPosition); - dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper); - frontCodedDictionarySupplier = null; + stringDictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.UTF8_STRATEGY, mapper); + frontCodedStringDictionarySupplier = null; } final ByteBuffer longDictionaryBuffer = loadInternalFile( mapper, @@ -151,6 +167,7 @@ public NestedDataColumnSupplier( mapper ); if (metadata.hasNulls()) { + columnBuilder.setHasNulls(true); final ByteBuffer nullIndexBuffer = loadInternalFile(mapper, NestedDataColumnSerializer.NULL_BITMAP_FILE_NAME); nullValues = metadata.getBitmapSerdeFactory().getObjectStrategy().fromByteBufferWithSize(nullIndexBuffer); } else { @@ -178,9 +195,15 @@ public ComplexColumn get() return makeV4(); } + @Nullable + public ColumnType getSimpleType() + { + return simpleType; + } + private NestedDataColumnV3 makeV3() { - if (frontCodedDictionarySupplier != null) { + if (frontCodedStringDictionarySupplier != null) { return new NestedDataColumnV3<>( metadata, columnConfig, @@ -188,7 +211,7 @@ private NestedDataColumnV3 makeV3() nullValues, fields, fieldInfo, - frontCodedDictionarySupplier, + frontCodedStringDictionarySupplier, longDictionarySupplier, doubleDictionarySupplier, fileMapper @@ -201,7 +224,7 @@ private NestedDataColumnV3 makeV3() nullValues, fields, fieldInfo, - dictionary::singleThreaded, + stringDictionary::singleThreaded, longDictionarySupplier, doubleDictionarySupplier, fileMapper @@ -210,7 +233,7 @@ private NestedDataColumnV3 makeV3() private NestedDataColumnV4 makeV4() { - if (frontCodedDictionarySupplier != null) { + if (frontCodedStringDictionarySupplier != null) { return new NestedDataColumnV4<>( metadata, columnConfig, @@ -218,7 +241,7 @@ private NestedDataColumnV4 makeV4() nullValues, fields, fieldInfo, - frontCodedDictionarySupplier, + frontCodedStringDictionarySupplier, longDictionarySupplier, doubleDictionarySupplier, fileMapper @@ -231,7 +254,7 @@ private NestedDataColumnV4 makeV4() nullValues, fields, fieldInfo, - dictionary::singleThreaded, + stringDictionary::singleThreaded, longDictionarySupplier, doubleDictionarySupplier, fileMapper diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV3.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV3.java index 0bac9fed3dd2..8c2cc91963c5 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV3.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV3.java @@ -58,7 +58,8 @@ public NestedDataColumnV3( stringDictionary, longDictionarySupplier, doubleDictionarySupplier, - fileMapper + fileMapper, + NestedPathFinder.JQ_PATH_ROOT ); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV4.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV4.java index cf9c2799b10e..96c5f56e2bd4 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV4.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnV4.java @@ -57,7 +57,8 @@ public NestedDataColumnV4( stringDictionary, longDictionarySupplier, doubleDictionarySupplier, - fileMapper + fileMapper, + NestedPathFinder.JSON_PATH_ROOT ); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java index e4b73377969d..fec5dcfeaa11 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java @@ -88,8 +88,23 @@ public void deserializeColumn( capabilitiesBuilder.setDictionaryEncoded(true); capabilitiesBuilder.setDictionaryValuesSorted(true); capabilitiesBuilder.setDictionaryValuesUnique(true); - builder.setComplexTypeName(TYPE_NAME); + ColumnType simpleType = supplier.getSimpleType(); + if (simpleType != null) { + builder.setType(simpleType); + } else { + builder.setComplexTypeName(TYPE_NAME); + + } builder.setComplexColumnSupplier(supplier); + + // always use the nested column dimension handler, regardless what we claim our query time type is + builder.setHandlerCapabilities( + new ColumnCapabilitiesImpl().setType(TYPE) + .setDictionaryEncoded(true) + .setDictionaryValuesUnique(true) + .setDictionaryValuesSorted(true) + .setHasNulls(capabilitiesBuilder.hasNulls()) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedPathFinder.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedPathFinder.java index 93872a2a4756..b275199bb0e7 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/NestedPathFinder.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedPathFinder.java @@ -30,17 +30,20 @@ public class NestedPathFinder { + public static final String JSON_PATH_ROOT = "$"; + public static final String JQ_PATH_ROOT = "."; + public static String toNormalizedJsonPath(List paths) { if (paths.isEmpty()) { - return "$"; + return JSON_PATH_ROOT; } StringBuilder bob = new StringBuilder(); boolean first = true; for (NestedPathPart partFinder : paths) { if (partFinder instanceof NestedPathField) { if (first) { - bob.append("$"); + bob.append(JSON_PATH_ROOT); } final String id = partFinder.getPartIdentifier(); if (id.contains(".") || id.contains("'") || id.contains("\"") || id.contains("[") || id.contains("]")) { @@ -51,7 +54,7 @@ public static String toNormalizedJsonPath(List paths) } } else if (partFinder instanceof NestedPathArrayElement) { if (first) { - bob.append("$"); + bob.append(JSON_PATH_ROOT); } bob.append("[").append(partFinder.getPartIdentifier()).append("]"); } @@ -70,7 +73,7 @@ public static List parseJsonPath(@Nullable String path) } List parts = new ArrayList<>(); - if (!path.startsWith("$")) { + if (!path.startsWith(JSON_PATH_ROOT)) { badFormatJsonPath(path, "must start with '$'"); } @@ -162,7 +165,7 @@ public static List parseJsonPath(@Nullable String path) public static String toNormalizedJqPath(List paths) { if (paths.isEmpty()) { - return "."; + return JQ_PATH_ROOT; } StringBuilder bob = new StringBuilder(); boolean first = true; @@ -172,7 +175,7 @@ public static String toNormalizedJqPath(List paths) bob.append("\"").append(partFinder.getPartIdentifier()).append("\""); } else { if (first) { - bob.append("."); + bob.append(JQ_PATH_ROOT); } bob.append("[").append(partFinder.getPartIdentifier()).append("]"); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java b/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java index 065e95506481..32513d62e025 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/StructuredData.java @@ -27,6 +27,7 @@ import org.apache.druid.segment.column.TypeStrategies; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Comparator; import java.util.Objects; import java.util.function.LongSupplier; @@ -172,6 +173,9 @@ public boolean equals(Object o) return false; } StructuredData that = (StructuredData) o; + if (value instanceof Object[] && that.value instanceof Object[]) { + return Arrays.deepEquals((Object[]) value, (Object[]) that.value); + } return Objects.equals(value, that.value); } diff --git a/processing/src/main/java/org/apache/druid/segment/nested/StructuredDataProcessor.java b/processing/src/main/java/org/apache/druid/segment/nested/StructuredDataProcessor.java index 2bca541ade47..8549deda44fd 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/StructuredDataProcessor.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/StructuredDataProcessor.java @@ -25,12 +25,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; public abstract class StructuredDataProcessor { - public abstract int processLiteralField(ArrayList fieldPath, Object fieldValue); + public abstract ProcessedLiteral processLiteralField(ArrayList fieldPath, Object fieldValue); /** * Process fields, returning a list of all paths to literal fields, represented as an ordered sequence of @@ -38,18 +39,18 @@ public abstract class StructuredDataProcessor */ public ProcessResults processFields(Object raw) { - Queue toProcess = new ArrayDeque<>(); + final Queue toProcess = new ArrayDeque<>(); raw = StructuredData.unwrap(raw); - ArrayList newPath = new ArrayList<>(); + final ArrayList newPath = new ArrayList<>(); if (raw instanceof Map) { toProcess.add(new MapField(newPath, (Map) raw)); } else if (raw instanceof List) { toProcess.add(new ListField(newPath, (List) raw)); } else { - return new ProcessResults().addLiteralField(newPath, processLiteralField(newPath, raw)); + return new ProcessResults().addLiteralField(newPath, processLiteralField(newPath, raw).getSize()); } - ProcessResults accumulator = new ProcessResults(); + final ProcessResults accumulator = new ProcessResults(); while (!toProcess.isEmpty()) { Field next = toProcess.poll(); @@ -65,7 +66,7 @@ public ProcessResults processFields(Object raw) private ProcessResults processMapField(Queue toProcess, MapField map) { // just guessing a size for a Map as some constant, it might be bigger than this... - ProcessResults processResults = new ProcessResults().withSize(16); + final ProcessResults processResults = new ProcessResults().withSize(16); for (Map.Entry entry : map.getMap().entrySet()) { // add estimated size of string key processResults.addSize(estimateStringSize(entry.getKey())); @@ -80,7 +81,7 @@ private ProcessResults processMapField(Queue toProcess, MapField map) toProcess.add(new MapField(newPath, (Map) value)); } else { // literals get processed - processResults.addLiteralField(newPath, processLiteralField(newPath, value)); + processResults.addLiteralField(newPath, processLiteralField(newPath, value).getSize()); } } return processResults; @@ -89,7 +90,7 @@ private ProcessResults processMapField(Queue toProcess, MapField map) private ProcessResults processListField(Queue toProcess, ListField list) { // start with object reference, is probably a bit bigger than this... - ProcessResults results = new ProcessResults().withSize(8); + final ProcessResults results = new ProcessResults().withSize(8); final List theList = list.getList(); for (int i = 0; i < theList.size(); i++) { final ArrayList newPath = new ArrayList<>(list.getPath()); @@ -102,7 +103,7 @@ private ProcessResults processListField(Queue toProcess, ListField list) toProcess.add(new ListField(newPath, (List) element)); } else { // literals get processed - results.addLiteralField(newPath, processLiteralField(newPath, element)); + results.addLiteralField(newPath, processLiteralField(newPath, element).getSize()); } } return results; @@ -155,6 +156,51 @@ static class MapField extends Field } } + public static class ProcessedLiteral + { + public static final ProcessedLiteral NULL_LITERAL = new ProcessedLiteral<>(null, 0); + @Nullable + private final T value; + private final int size; + + public ProcessedLiteral(@Nullable T value, int size) + { + this.value = value; + this.size = size; + } + + @SuppressWarnings("unused") + @Nullable + public T getValue() + { + return value; + } + + public int getSize() + { + return size; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ProcessedLiteral that = (ProcessedLiteral) o; + return size == that.size && Objects.equals(value, that.value); + } + + @Override + public int hashCode() + { + return Objects.hash(value, size); + } + } + /** * Accumulates the list of literal field paths and a rough size estimation for {@link StructuredDataProcessor} */ diff --git a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java index 68d2f210adae..04a676060c3e 100644 --- a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java @@ -29,8 +29,10 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.DictionaryEncodedColumn; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.nested.NestedDataComplexColumn; import javax.annotation.Nullable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.function.Function; @@ -161,14 +163,15 @@ public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(final throw new ISE("Column[%s] is multi-value, do not ask for a single-value selector", spec.getDimension()); } - @SuppressWarnings("unchecked") - final DictionaryEncodedColumn dictionaryEncodedColumn = (DictionaryEncodedColumn) - columnCache.getColumn(spec.getDimension()); - - // dictionaryEncodedColumn is not null because of holder null check above - assert dictionaryEncodedColumn != null; - final SingleValueDimensionVectorSelector selector = - dictionaryEncodedColumn.makeSingleValueDimensionVectorSelector(offset); + final BaseColumn column = columnCache.getColumn(spec.getDimension()); + final SingleValueDimensionVectorSelector selector; + if (column instanceof DictionaryEncodedColumn) { + selector = ((DictionaryEncodedColumn) column).makeSingleValueDimensionVectorSelector(offset); + } else if (column instanceof NestedDataComplexColumn) { + selector = ((NestedDataComplexColumn) column).makeSingleValueDimensionVectorSelector(Collections.emptyList(), offset); + } else { + selector = NilVectorSelector.create(offset); + } return spec.decorate(selector); }; diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java index c55f07c03075..54bfe1dcb59f 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java @@ -19,18 +19,55 @@ package org.apache.druid.segment; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + public class NestedDataColumnIndexerTest extends InitializedNullHandlingTest { + private static final String TIME_COL = "time"; + private static final String STRING_COL = "string"; + private static final String STRING_ARRAY_COL = "string_array"; + private static final String LONG_COL = "long"; + private static final String DOUBLE_COL = "double"; + private static final String VARIANT_COL = "variant"; + private static final String NESTED_COL = "nested"; + + @BeforeClass + public static void setup() + { + NestedDataModule.registerHandlersAndSerde(); + } + @Test - public void testStuff() + public void testKeySizeEstimation() { NestedDataColumnIndexer indexer = new NestedDataColumnIndexer(); Assert.assertEquals(0, indexer.getCardinality()); @@ -66,11 +103,17 @@ public void testStuff() Assert.assertEquals(56, key.getEffectiveSizeBytes()); Assert.assertEquals(5, indexer.getCardinality()); // new raw value, new fields - key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), false); + key = indexer.processRowValsToUnsortedEncodedKeyComponent( + ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), + false + ); Assert.assertEquals(286, key.getEffectiveSizeBytes()); Assert.assertEquals(5, indexer.getCardinality()); // new raw value - key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), false); + key = indexer.processRowValsToUnsortedEncodedKeyComponent( + ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), + false + ); Assert.assertEquals(118, key.getEffectiveSizeBytes()); Assert.assertEquals(5, indexer.getCardinality()); @@ -83,4 +126,459 @@ public void testStuff() Assert.assertEquals(6, indexer.getCardinality()); } } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootString() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, STRING_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, STRING_COL, "b")); + index.add(makeInputRow(minTimestamp + 3, true, STRING_COL, "c")); + index.add(makeInputRow(minTimestamp + 4, true, STRING_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, STRING_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_COL, STRING_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("a", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("a", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("a", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("b", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("b", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("b", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("c", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("c", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("c", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootLong() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, LONG_COL, 1L)); + index.add(makeInputRow(minTimestamp + 2, true, LONG_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, LONG_COL, 3L)); + index.add(makeInputRow(minTimestamp + 4, true, LONG_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, LONG_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(LONG_COL, LONG_COL, ColumnType.LONG); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(1L, valueSelector.getObject()); + Assert.assertEquals(1L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("1", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2L, valueSelector.getObject()); + Assert.assertEquals(2L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3L, valueSelector.getObject()); + Assert.assertEquals(3L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootDouble() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, DOUBLE_COL, 1.1)); + index.add(makeInputRow(minTimestamp + 2, true, DOUBLE_COL, 2.2)); + index.add(makeInputRow(minTimestamp + 3, true, DOUBLE_COL, 3.3)); + index.add(makeInputRow(minTimestamp + 4, true, DOUBLE_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, DOUBLE_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(DOUBLE_COL, DOUBLE_COL, ColumnType.DOUBLE); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(1.1, valueSelector.getObject()); + Assert.assertEquals(1.1, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("1.1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("1.1", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2.2, valueSelector.getObject()); + Assert.assertEquals(2.2, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("2.2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("2.2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3.3, valueSelector.getObject()); + Assert.assertEquals(3.3, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("3.3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("3.3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootStringArray() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, STRING_ARRAY_COL, new String[]{"a"})); + index.add(makeInputRow(minTimestamp + 2, true, STRING_ARRAY_COL, new Object[]{"b", "c"})); + index.add(makeInputRow(minTimestamp + 3, true, STRING_ARRAY_COL, ImmutableList.of("d", "e"))); + index.add(makeInputRow(minTimestamp + 4, true, STRING_ARRAY_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, STRING_ARRAY_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_ARRAY_COL, STRING_ARRAY_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(new Object[]{"a"}), valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(new Object[]{"b", "c"}), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + // raw data is left as is, so is currently still a list while in incremental index... + Assert.assertEquals(StructuredData.wrap(ImmutableList.of("d", "e")), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootVariant() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, VARIANT_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, VARIANT_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, VARIANT_COL, 3.3)); + index.add(makeInputRow(minTimestamp + 4, true, VARIANT_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, VARIANT_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(VARIANT_COL, VARIANT_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap("a"), valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(2L), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(3.3), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryNested() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, NESTED_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, NESTED_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, NESTED_COL, ImmutableMap.of("x", 1.1, "y", 2L))); + index.add(makeInputRow(minTimestamp + 4, true, NESTED_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, NESTED_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(NESTED_COL, NESTED_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap("a"), valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(2L), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(ImmutableMap.of("x", 1.1, "y", 2L)), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Nonnull + private static IncrementalIndex makeIncrementalIndex(long minTimestamp) + { + IncrementalIndex index = new OnheapIncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema( + minTimestamp, + new TimestampSpec(TIME_COL, "millis", null), + Granularities.NONE, + VirtualColumns.EMPTY, + new DimensionsSpec(Collections.emptyList()), + new AggregatorFactory[0], + false + ) + ) + .setMaxRowCount(1000) + .setUseNestedColumnIndexerForSchemaDiscovery(true) + .build(); + return index; + } + + private MapBasedInputRow makeInputRow( + long timestamp, + boolean explicitNull, + Object... kv + ) + { + HashMap event = new HashMap<>(); + event.put("time", timestamp); + Preconditions.checkArgument(kv.length % 2 == 0); + String currentKey = null; + for (int i = 0; i < kv.length; i++) { + if (i % 2 == 0) { + currentKey = (String) kv[i]; + } else { + if (explicitNull || kv[i] != null) { + event.put(currentKey, kv[i]); + } + } + } + + return new MapBasedInputRow(timestamp, ImmutableList.copyOf(event.keySet()), event); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java index 77309ff27777..382d7613cf62 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java @@ -33,6 +33,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -105,6 +106,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest public IncrementalIndexTest(String indexType, String mode, boolean isPreserveExistingMetrics) throws JsonProcessingException { + NestedDataModule.registerHandlersAndSerde(); this.isPreserveExistingMetrics = isPreserveExistingMetrics; indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder .setSimpleTestingIndexSchema("rollup".equals(mode), isPreserveExistingMetrics, (AggregatorFactory[]) args[0]) diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java index 121f093b1a67..cde4c5c1fa32 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.query.aggregation.Aggregator; @@ -58,6 +59,7 @@ public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest public IncrementalIndexIngestionTest(String indexType) throws JsonProcessingException { + NestedDataModule.registerHandlersAndSerde(); indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder .setIndexSchema((IncrementalIndexSchema) args[0]) .setMaxRowCount(MAX_ROWS) diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java index c692fd4ecd2b..b980c0251b6a 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.CloserRule; @@ -55,6 +56,7 @@ public class IncrementalIndexMultiValueSpecTest extends InitializedNullHandlingT public IncrementalIndexMultiValueSpecTest(String indexType) throws JsonProcessingException { + NestedDataModule.registerHandlersAndSerde(); indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder .setIndexSchema((IncrementalIndexSchema) args[0]) .setMaxRowCount(10_000) diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index f59fc5462ce5..ae6622d1d77e 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -31,6 +31,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -98,6 +99,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT public IncrementalIndexStorageAdapterTest(String indexType) throws JsonProcessingException { + NestedDataModule.registerHandlersAndSerde(); indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) .setMaxRowCount(1_000) diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 02d43e275061..dad2a4c2134d 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -125,7 +125,8 @@ public MapIncrementalIndex( maxRowCount, maxBytesInMemory, false, - true + true, + false ); } @@ -149,7 +150,8 @@ public MapIncrementalIndex( maxRowCount, maxBytesInMemory, false, - true + true, + false ); } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java index 7b1a7c54682b..ad8827f2a95b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java @@ -88,7 +88,7 @@ public static ClientCompactionTaskQueryTuningConfig from( if (userCompactionTaskQueryTuningConfig == null) { return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, - new OnheapIncrementalIndex.Spec(preserveExistingMetrics), + new OnheapIncrementalIndex.Spec(preserveExistingMetrics, false), null, null, null, @@ -111,7 +111,7 @@ public static ClientCompactionTaskQueryTuningConfig from( } else { AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() - : new OnheapIncrementalIndex.Spec(preserveExistingMetrics); + : new OnheapIncrementalIndex.Spec(preserveExistingMetrics, false); return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, appendableIndexSpecToUse, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index 883d6f5a58b2..5a722e342e63 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -360,7 +360,7 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) QueryableIndex oldIndex = segment.asQueryableIndex(); for (String dim : oldIndex.getAvailableDimensions()) { dimOrder.add(dim); - oldCapabilities.put(dim, oldIndex.getColumnHolder(dim).getCapabilities()); + oldCapabilities.put(dim, oldIndex.getColumnHolder(dim).getHandlerCapabilities()); } } finally { @@ -369,7 +369,7 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) } else { IncrementalIndex oldIndex = lastHydrant.getIndex(); dimOrder.addAll(oldIndex.getDimensionOrder()); - oldCapabilities = oldIndex.getColumnCapabilities(); + oldCapabilities = oldIndex.getColumnHandlerCapabilities(); } newIndex.loadDimensionIterable(dimOrder, oldCapabilities); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index a0f4cf9a6101..4a48f3280c7c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -259,7 +259,7 @@ public void testSerdeUserCompactionTuningConfigWithAppendableIndexSpec() throws { final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( 40000, - new OnheapIncrementalIndex.Spec(true), + new OnheapIncrementalIndex.Spec(true, false), 2000L, null, new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java index 01c889ad2c3c..947328722964 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java @@ -78,7 +78,7 @@ public void testSerde() throws IOException { final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( 40000, - new OnheapIncrementalIndex.Spec(true), + new OnheapIncrementalIndex.Spec(true, false), 2000L, null, new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 73abcb8d5eac..b1f3ad99527c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -1523,7 +1523,7 @@ public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() null, new UserCompactionTaskQueryTuningConfig( null, - new OnheapIncrementalIndex.Spec(true), + new OnheapIncrementalIndex.Spec(true, false), null, 1000L, null, @@ -1558,7 +1558,7 @@ public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() null, new UserCompactionTaskQueryTuningConfig( null, - new OnheapIncrementalIndex.Spec(false), + new OnheapIncrementalIndex.Spec(false, false), null, 1000L, null, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java index 1bd1e5819315..c57611a1218f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java @@ -266,6 +266,8 @@ private Object coerce(final Object value, final SqlTypeName sqlType) coercedValue = ((NlsString) value).getValue(); } else if (value instanceof Number) { coercedValue = String.valueOf(value); + } else if (value instanceof Boolean) { + coercedValue = String.valueOf(value); } else if (value instanceof Collection) { // Iterate through the collection, coercing each value. Useful for handling selects of multi-value dimensions. final List valueStrings = ((Collection) value).stream() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java index 1e9239c70d07..a8d0c94df73e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java @@ -28,9 +28,7 @@ import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.MapInputRowParser; -import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimeAndDimsParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.DruidInjectorBuilder; @@ -133,10 +131,10 @@ public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest new TimestampSpec("t", "iso", null), DimensionsSpec.builder().setDimensions( ImmutableList.builder() - .add(new StringDimensionSchema("string")) + .add(new NestedDataDimensionSchema("string")) .add(new NestedDataDimensionSchema("nest")) .add(new NestedDataDimensionSchema("nester")) - .add(new LongDimensionSchema("long")) + .add(new NestedDataDimensionSchema("long")) .build() ).build() )); @@ -373,6 +371,78 @@ public void testGroupByRootPath() ); } + @Test + public void testGroupByRootSingleTypeLong() + { + testQuery( + "SELECT " + + "long, " + + "SUM(cnt) " + + "FROM druid.nested GROUP BY 1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(DATA_SOURCE) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + dimensions( + new DefaultDimensionSpec("long", "d0", ColumnType.LONG) + ) + ) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 1L}, + new Object[]{2L, 2L}, + new Object[]{3L, 1L}, + new Object[]{4L, 1L}, + new Object[]{5L, 2L} + ), + RowSignature.builder() + .add("long", ColumnType.LONG) + .add("EXPR$1", ColumnType.LONG) + .build() + ); + } + + @Test + public void testGroupByRootSingleTypeString() + { + testQuery( + "SELECT " + + "string, " + + "SUM(cnt) " + + "FROM druid.nested GROUP BY 1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(DATA_SOURCE) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + dimensions( + new DefaultDimensionSpec("string", "d0") + ) + ) + .setAggregatorSpecs(aggregators(new LongSumAggregatorFactory("a0", "cnt"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"aaa", 2L}, + new Object[]{"bbb", 1L}, + new Object[]{"ccc", 1L}, + new Object[]{"ddd", 2L}, + new Object[]{"eee", 1L} + ), + RowSignature.builder() + .add("string", ColumnType.STRING) + .add("EXPR$1", ColumnType.LONG) + .build() + ); + } + @Test public void testGroupByJsonValues() {