From 904f99b559cd19e5317c4721d2e9189c274ec288 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Sep 2023 15:57:10 -0700 Subject: [PATCH 1/8] longer compatibility window for nested column format v4 changes: * add back nested column v4 serializers * 'json' schema by default still uses the newer 'nested common format' used by 'auto', but now has an optional 'formatVersion' property which can be specified to override format versions on native ingest jobs * add system config to specify default column format stuff, 'druid.indexing.formats', and property 'druid.indexing.formats.nestedColumnFormatVersion' to specify system level preferred nested column format for friendly rolling upgrades from versions which do not support the newer 'nested common format' used by 'auto' --- .../data/input/impl/DimensionSchema.java | 3 +- .../apache/druid/guice/NestedDataModule.java | 62 +- .../segment/DefaultColumnFormatConfig.java | 74 +++ .../segment/NestedDataColumnHandlerV4.java | 105 +++ .../segment/NestedDataColumnIndexerV4.java | 501 ++++++++++++++ .../segment/NestedDataColumnMergerV4.java | 215 ++++++ .../druid/segment/NestedDataColumnSchema.java | 121 ++++ .../incremental/IncrementalIndexAdapter.java | 13 + .../IncrementalIndexStorageAdapter.java | 9 + .../nested/NestedDataColumnSerializerV4.java | 396 ++++++++++++ .../druid/guice/NestedDataModuleTest.java | 112 ++++ .../druid/query/NestedDataTestUtils.java | 30 + .../query/scan/NestedDataScanQueryTest.java | 30 + .../DefaultColumnFormatsConfigTest.java | 58 ++ .../NestedDataColumnIndexerV4Test.java | 612 ++++++++++++++++++ .../segment/NestedDataColumnSchemaTest.java | 85 +++ .../NestedDataColumnSupplierV4Test.java | 305 +++++++-- .../apache/druid/guice/StorageNodeModule.java | 2 + 18 files changed, 2685 insertions(+), 48 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java create mode 100644 processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java create mode 100644 processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java create mode 100644 processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java create mode 100644 processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java create mode 100644 processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java create mode 100644 processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java create mode 100644 processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java create mode 100644 processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java create mode 100644 processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java index 379c8c21ac9f..9b692ecb7c55 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java @@ -32,6 +32,7 @@ import org.apache.druid.segment.AutoTypeColumnSchema; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.NestedDataColumnSchema; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.TypeSignature; import org.apache.druid.segment.column.ValueType; @@ -50,7 +51,7 @@ @JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class), - @JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = AutoTypeColumnSchema.class), + @JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataColumnSchema.class), @JsonSubTypes.Type(name = AutoTypeColumnSchema.TYPE, value = AutoTypeColumnSchema.class) }) public abstract class DimensionSchema diff --git a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java index 32564f4dd408..5de6f5ea65ea 100644 --- a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java +++ b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java @@ -24,9 +24,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Binder; +import com.google.inject.Provides; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.segment.DefaultColumnFormatConfig; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.NestedCommonFormatColumnHandler; +import org.apache.druid.segment.NestedDataColumnHandlerV4; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.nested.StructuredDataJsonSerializer; @@ -47,20 +50,28 @@ public List getJacksonModules() @Override public void configure(Binder binder) { - registerHandlersAndSerde(); + registerSerde(); + // binding our side effect class to the lifecycle causes registerHandler to be called on service start, allowing + // use of the config to get the system default format version + LifecycleModule.register(binder, SideEffectHandlerRegisterer.class); } - - @VisibleForTesting - public static void registerHandlersAndSerde() + @Provides + @LazySingleton + public SideEffectHandlerRegisterer registerHandler(DefaultColumnFormatConfig formatsConfig) { - if (ComplexMetrics.getSerdeForType(NestedDataComplexTypeSerde.TYPE_NAME) == null) { - ComplexMetrics.registerSerde(NestedDataComplexTypeSerde.TYPE_NAME, NestedDataComplexTypeSerde.INSTANCE); + if (formatsConfig.getNestedColumnFormatVersion() != null && formatsConfig.getNestedColumnFormatVersion() == 4) { + DimensionHandlerUtils.registerDimensionHandlerProvider( + NestedDataComplexTypeSerde.TYPE_NAME, + NestedDataColumnHandlerV4::new + ); + } else { + DimensionHandlerUtils.registerDimensionHandlerProvider( + NestedDataComplexTypeSerde.TYPE_NAME, + NestedCommonFormatColumnHandler::new + ); } - DimensionHandlerUtils.registerDimensionHandlerProvider( - NestedDataComplexTypeSerde.TYPE_NAME, - NestedCommonFormatColumnHandler::new - ); + return new SideEffectHandlerRegisterer(); } public static List getJacksonModulesList() @@ -71,4 +82,35 @@ public static List getJacksonModulesList() .addSerializer(StructuredData.class, new StructuredDataJsonSerializer()) ); } + + /** + * Helper for wiring stuff up for tests + */ + @VisibleForTesting + public static void registerHandlersAndSerde() + { + registerSerde(); + DimensionHandlerUtils.registerDimensionHandlerProvider( + NestedDataComplexTypeSerde.TYPE_NAME, + NestedCommonFormatColumnHandler::new + ); + } + + private static void registerSerde() + { + if (ComplexMetrics.getSerdeForType(NestedDataComplexTypeSerde.TYPE_NAME) == null) { + ComplexMetrics.registerSerde(NestedDataComplexTypeSerde.TYPE_NAME, NestedDataComplexTypeSerde.INSTANCE); + } + } + + /** + * this is used as a vehicle to register the correct version of the system default nested column handler by side + * effect with the help of binding to {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} so that + * {@link #registerHandler(DefaultColumnFormatConfig)} can be called with the injected + * {@link DefaultColumnFormatConfig}. + */ + public static class SideEffectHandlerRegisterer + { + // nothing to see here + } } diff --git a/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java new file mode 100644 index 000000000000..696827f7c452 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; +import java.util.Objects; + +public class DefaultColumnFormatConfig +{ + @JsonProperty("nestedColumnFormatVersion") + private final Integer nestedColumnFormatVersion; + + @JsonCreator + public DefaultColumnFormatConfig( + @JsonProperty("nestedColumnFormatVersion") @Nullable Integer nestedColumnFormatVersion + ) + { + this.nestedColumnFormatVersion = nestedColumnFormatVersion; + } + + @Nullable + @JsonProperty("nestedColumnFormatVersion") + public Integer getNestedColumnFormatVersion() + { + return nestedColumnFormatVersion; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultColumnFormatConfig that = (DefaultColumnFormatConfig) o; + return Objects.equals(nestedColumnFormatVersion, that.nestedColumnFormatVersion); + } + + @Override + public int hashCode() + { + return Objects.hash(nestedColumnFormatVersion); + } + + @Override + public String toString() + { + return "DefaultColumnFormatsConfig{" + + "nestedColumnFormatVersion=" + nestedColumnFormatVersion + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java new file mode 100644 index 000000000000..a63a18eac7d3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.nested.StructuredData; +import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; +import org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.util.Comparator; + +public class NestedDataColumnHandlerV4 implements DimensionHandler +{ + private static Comparator COMPARATOR = (s1, s2) -> + StructuredData.COMPARATOR.compare( + StructuredData.wrap(s1.getObject()), + StructuredData.wrap(s2.getObject()) + ); + + private final String name; + + public NestedDataColumnHandlerV4(String name) + { + this.name = name; + } + + @Override + public String getDimensionName() + { + return name; + } + + @Override + public DimensionSpec getDimensionSpec() + { + return new DefaultDimensionSpec(name, name, ColumnType.NESTED_DATA); + } + + @Override + public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities) + { + return new NestedDataColumnSchema(name, 4); + } + + @Override + public DimensionIndexer makeIndexer(boolean useMaxMemoryEstimates) + { + return new NestedDataColumnIndexerV4(); + } + + @Override + public DimensionMergerV9 makeMerger( + IndexSpec indexSpec, + SegmentWriteOutMedium segmentWriteOutMedium, + ColumnCapabilities capabilities, + ProgressIndicator progress, + Closer closer + ) + { + return new NestedDataColumnMergerV4(name, indexSpec, segmentWriteOutMedium, closer); + } + + @Override + public int getLengthOfEncodedKeyComponent(StructuredData dimVals) + { + // this is called in one place, OnheapIncrementalIndex, where returning 0 here means the value is null + // so the actual value we return here doesn't matter. we should consider refactoring this to a boolean + return 1; + } + + @Override + public Comparator getEncodedValueSelectorComparator() + { + return COMPARATOR; + } + + @Override + public SettableColumnValueSelector makeNewSettableEncodedValueSelector() + { + return new SettableObjectColumnValueSelector(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java new file mode 100644 index 000000000000..ecaf89e84836 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.collections.bitmap.MutableBitmap; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnFormat; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.CloseableIndexed; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexRowHolder; +import org.apache.druid.segment.nested.FieldTypeInfo; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; +import org.apache.druid.segment.nested.NestedPathFinder; +import org.apache.druid.segment.nested.NestedPathPart; +import org.apache.druid.segment.nested.SortedValueDictionary; +import org.apache.druid.segment.nested.StructuredData; +import org.apache.druid.segment.nested.StructuredDataProcessor; +import org.apache.druid.segment.nested.ValueDictionary; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; + +public class NestedDataColumnIndexerV4 implements DimensionIndexer +{ + private static final ColumnFormat FORMAT = new NestedDataComplexTypeSerde.NestedColumnFormatV4(); + + protected volatile boolean hasNulls = false; + + protected SortedMap fieldIndexers = new TreeMap<>(); + protected final ValueDictionary globalDictionary = new ValueDictionary(); + + int estimatedFieldKeySize = 0; + + protected final StructuredDataProcessor indexerProcessor = new StructuredDataProcessor() + { + @Override + public ProcessedValue processField(ArrayList fieldPath, @Nullable Object fieldValue) + { + // null value is always added to the global dictionary as id 0, so we can ignore them here + if (fieldValue != null) { + final String fieldName = NestedPathFinder.toNormalizedJsonPath(fieldPath); + ExprEval eval = ExprEval.bestEffortOf(fieldValue); + FieldIndexer fieldIndexer = fieldIndexers.get(fieldName); + if (fieldIndexer == null) { + estimatedFieldKeySize += StructuredDataProcessor.estimateStringSize(fieldName); + fieldIndexer = new FieldIndexer(globalDictionary); + fieldIndexers.put(fieldName, fieldIndexer); + } + return fieldIndexer.processValue(eval); + } + return ProcessedValue.NULL_LITERAL; + } + + @Nullable + @Override + public ProcessedValue processArrayField( + ArrayList fieldPath, + @Nullable List array + ) + { + // classic nested data column indexer does not handle arrays + return null; + } + }; + + @Override + public EncodedKeyComponent processRowValsToUnsortedEncodedKeyComponent( + @Nullable Object dimValues, + boolean reportParseExceptions + ) + { + final long oldDictSizeInBytes = globalDictionary.sizeInBytes(); + final int oldFieldKeySize = estimatedFieldKeySize; + final StructuredData data; + if (dimValues == null) { + hasNulls = true; + data = null; + } else if (dimValues instanceof StructuredData) { + data = (StructuredData) dimValues; + } else { + data = new StructuredData(dimValues); + } + StructuredDataProcessor.ProcessResults info = indexerProcessor.processFields(data == null ? null : data.getValue()); + // 'raw' data is currently preserved 'as-is', and not replaced with object references to the global dictionaries + long effectiveSizeBytes = info.getEstimatedSize(); + // then, we add the delta of size change to the global dictionaries to account for any new space added by the + // 'raw' data + effectiveSizeBytes += (globalDictionary.sizeInBytes() - oldDictSizeInBytes); + effectiveSizeBytes += (estimatedFieldKeySize - oldFieldKeySize); + return new EncodedKeyComponent<>(data, effectiveSizeBytes); + } + + @Override + public void setSparseIndexed() + { + this.hasNulls = true; + } + + @Override + public StructuredData getUnsortedEncodedValueFromSorted(StructuredData sortedIntermediateValue) + { + return sortedIntermediateValue; + } + + @Override + public CloseableIndexed getSortedIndexedValues() + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public StructuredData getMinValue() + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public StructuredData getMaxValue() + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public int getCardinality() + { + return globalDictionary.getCardinality(); + } + + @Override + public DimensionSelector makeDimensionSelector( + DimensionSpec spec, + IncrementalIndexRowHolder currEntry, + IncrementalIndex.DimensionDesc desc + ) + { + final int dimIndex = desc.getIndex(); + final ColumnValueSelector rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex); + if (rootLiteralSelector != null) { + return new BaseSingleValueDimensionSelector() + { + @Nullable + @Override + protected String getValue() + { + final Object o = rootLiteralSelector.getObject(); + if (o == null) { + return null; + } + return o.toString(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } + // column has nested data or is of mixed root type, cannot use + throw new UOE( + "makeDimensionSelector is not supported, column [%s] is [%s] typed and should only use makeColumnValueSelector", + spec.getOutputName(), + ColumnType.NESTED_DATA + ); + } + + @Override + public ColumnValueSelector makeColumnValueSelector( + IncrementalIndexRowHolder currEntry, + IncrementalIndex.DimensionDesc desc + ) + { + final int dimIndex = desc.getIndex(); + final ColumnValueSelector rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex); + if (rootLiteralSelector != null) { + return rootLiteralSelector; + } + + return new ObjectColumnSelector() + { + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Nullable + @Override + public StructuredData getObject() + { + final Object[] dims = currEntry.get().getDims(); + if (0 <= dimIndex && dimIndex < dims.length) { + return (StructuredData) dims[dimIndex]; + } else { + return null; + } + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + }; + } + private ColumnType getLogicalType() + { + if (fieldIndexers.size() == 1 && fieldIndexers.containsKey(NestedPathFinder.JSON_PATH_ROOT)) { + FieldIndexer rootField = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + ColumnType singleType = rootField.getTypes().getSingleType(); + return singleType == null ? ColumnType.NESTED_DATA : singleType; + } + return ColumnType.NESTED_DATA; + } + + @Override + public ColumnCapabilities getColumnCapabilities() + { + return ColumnCapabilitiesImpl.createDefault() + .setType(getLogicalType()) + .setHasNulls(hasNulls); + } + + @Override + public ColumnFormat getFormat() + { + return FORMAT; + } + + public SortedValueDictionary getSortedValueLookups() + { + return globalDictionary.getSortedCollector(); + } + + public SortedMap getFieldTypeInfo() + { + TreeMap fields = new TreeMap<>(); + for (Map.Entry entry : fieldIndexers.entrySet()) { + // skip adding the field if no types are in the set, meaning only null values have been processed + if (!entry.getValue().getTypes().isEmpty()) { + fields.put(entry.getKey(), entry.getValue().getTypes()); + } + } + return fields; + } + + @Override + public int compareUnsortedEncodedKeyComponents( + @Nullable StructuredData lhs, + @Nullable StructuredData rhs + ) + { + return StructuredData.COMPARATOR.compare(lhs, rhs); + } + + @Override + public boolean checkUnsortedEncodedKeyComponentsEqual( + @Nullable StructuredData lhs, + @Nullable StructuredData rhs + ) + { + return Objects.equals(lhs, rhs); + } + + @Override + public int getUnsortedEncodedKeyComponentHashCode(@Nullable StructuredData key) + { + return Objects.hash(key); + } + + @Override + public Object convertUnsortedEncodedKeyComponentToActualList(StructuredData key) + { + return key; + } + + @Override + public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues) + { + final FieldIndexer rootIndexer = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (fieldIndexers.size() == 1 && rootIndexer != null && rootIndexer.isSingleType()) { + // for root only literals, makeColumnValueSelector and makeDimensionSelector automatically unwrap StructuredData + // we need to do the opposite here, wrapping selector values with a StructuredData so that they are consistently + // typed for the merger + return new ColumnValueSelector() + { + @Override + public boolean isNull() + { + return selectorWithUnsortedValues.isNull(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + selectorWithUnsortedValues.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public StructuredData getObject() + { + return StructuredData.wrap(selectorWithUnsortedValues.getObject()); + } + + @Override + public float getFloat() + { + return selectorWithUnsortedValues.getFloat(); + } + + @Override + public double getDouble() + { + return selectorWithUnsortedValues.getDouble(); + } + + @Override + public long getLong() + { + return selectorWithUnsortedValues.getLong(); + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + }; + } + return selectorWithUnsortedValues; + } + + @Override + public void fillBitmapsFromUnsortedEncodedKeyComponent( + StructuredData key, + int rowNum, + MutableBitmap[] bitmapIndexes, + BitmapFactory factory + ) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Nullable + private ColumnValueSelector getRootLiteralValueSelector( + IncrementalIndexRowHolder currEntry, + int dimIndex + ) + { + if (fieldIndexers.size() > 1) { + return null; + } + final FieldIndexer root = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (root == null || !root.isSingleType()) { + return null; + } + return new ColumnValueSelector() + { + @Override + public boolean isNull() + { + final Object o = getObject(); + return !(o instanceof Number); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).floatValue(); + } + + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).doubleValue(); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).longValue(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Nullable + @Override + public Object getObject() + { + final Object[] dims = currEntry.get().getDims(); + if (0 <= dimIndex && dimIndex < dims.length) { + final StructuredData data = (StructuredData) dims[dimIndex]; + if (data != null) { + return ExprEval.bestEffortOf(data.getValue()).valueOrDefault(); + } + } + + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + static class FieldIndexer + { + private final ValueDictionary valueDictionary; + private final FieldTypeInfo.MutableTypeSet typeSet; + + FieldIndexer(ValueDictionary valueDictionary) + { + this.valueDictionary = valueDictionary; + this.typeSet = new FieldTypeInfo.MutableTypeSet(); + } + + private StructuredDataProcessor.ProcessedValue processValue(ExprEval eval) + { + final ColumnType columnType = ExpressionType.toColumnType(eval.type()); + int sizeEstimate; + switch (columnType.getType()) { + case LONG: + typeSet.add(ColumnType.LONG); + sizeEstimate = valueDictionary.addLongValue(eval.asLong()); + return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(), sizeEstimate); + case DOUBLE: + typeSet.add(ColumnType.DOUBLE); + sizeEstimate = valueDictionary.addDoubleValue(eval.asDouble()); + return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(), sizeEstimate); + case STRING: + typeSet.add(ColumnType.STRING); + final String asString = eval.asString(); + sizeEstimate = valueDictionary.addStringValue(asString); + return new StructuredDataProcessor.ProcessedValue<>(asString, sizeEstimate); + default: + throw new IAE("Unhandled type: %s", columnType); + } + } + + public FieldTypeInfo.MutableTypeSet getTypes() + { + return typeSet; + } + + public boolean isSingleType() + { + return typeSet.getSingleType() != null; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java new file mode 100644 index 000000000000..9a0dc139fbf7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.PeekingIterator; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.column.ColumnDescriptor; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.nested.FieldTypeInfo; +import org.apache.druid.segment.nested.NestedDataColumnSerializerV4; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; +import org.apache.druid.segment.nested.SortedValueDictionary; +import org.apache.druid.segment.serde.ComplexColumnPartSerde; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.IntBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +public class NestedDataColumnMergerV4 implements DimensionMergerV9 +{ + private static final Logger log = new Logger(NestedDataColumnMergerV4.class); + + public static final Comparator> STRING_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + public static final Comparator> LONG_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + public static final Comparator> DOUBLE_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + + private final String name; + private final IndexSpec indexSpec; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final Closer closer; + + private ColumnDescriptor.Builder descriptorBuilder; + private GenericColumnSerializer serializer; + + public NestedDataColumnMergerV4( + String name, + IndexSpec indexSpec, + SegmentWriteOutMedium segmentWriteOutMedium, + Closer closer + ) + { + + this.name = name; + this.indexSpec = indexSpec; + this.segmentWriteOutMedium = segmentWriteOutMedium; + this.closer = closer; + } + + @Override + public void writeMergedValueDictionary(List adapters) throws IOException + { + try { + long dimStartTime = System.currentTimeMillis(); + + int numMergeIndex = 0; + SortedValueDictionary sortedLookup = null; + final Indexed[] sortedLookups = new Indexed[adapters.size()]; + final Indexed[] sortedLongLookups = new Indexed[adapters.size()]; + final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()]; + + final SortedMap mergedFields = new TreeMap<>(); + + for (int i = 0; i < adapters.size(); i++) { + final IndexableAdapter adapter = adapters.get(i); + + final IndexableAdapter.NestedColumnMergable mergable = closer.register( + adapter.getNestedColumnMergeables(name) + ); + if (mergable == null) { + continue; + } + final SortedValueDictionary dimValues = mergable.getValueDictionary(); + + boolean allNulls = dimValues == null || dimValues.allNull(); + if (!allNulls) { + sortedLookup = dimValues; + mergable.mergeFieldsInto(mergedFields); + sortedLookups[i] = dimValues.getSortedStrings(); + sortedLongLookups[i] = dimValues.getSortedLongs(); + sortedDoubleLookups[i] = dimValues.getSortedDoubles(); + numMergeIndex++; + } + } + + descriptorBuilder = new ColumnDescriptor.Builder(); + + final NestedDataColumnSerializerV4 defaultSerializer = new NestedDataColumnSerializerV4( + name, + indexSpec, + segmentWriteOutMedium, + closer + ); + serializer = defaultSerializer; + + final ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.serializerBuilder() + .withTypeName(NestedDataComplexTypeSerde.TYPE_NAME) + .withDelegate(serializer) + .build(); + descriptorBuilder.setValueType(ValueType.COMPLEX) + .setHasMultipleValues(false) + .addSerde(partSerde); + + defaultSerializer.open(); + defaultSerializer.serializeFields(mergedFields); + + int stringCardinality; + int longCardinality; + int doubleCardinality; + if (numMergeIndex == 1) { + defaultSerializer.serializeDictionaries( + sortedLookup.getSortedStrings(), + sortedLookup.getSortedLongs(), + sortedLookup.getSortedDoubles() + ); + stringCardinality = sortedLookup.getStringCardinality(); + longCardinality = sortedLookup.getLongCardinality(); + doubleCardinality = sortedLookup.getDoubleCardinality(); + } else { + final SimpleDictionaryMergingIterator stringIterator = new SimpleDictionaryMergingIterator<>( + sortedLookups, + STRING_MERGING_COMPARATOR + ); + final SimpleDictionaryMergingIterator longIterator = new SimpleDictionaryMergingIterator<>( + sortedLongLookups, + LONG_MERGING_COMPARATOR + ); + final SimpleDictionaryMergingIterator doubleIterator = new SimpleDictionaryMergingIterator<>( + sortedDoubleLookups, + DOUBLE_MERGING_COMPARATOR + ); + defaultSerializer.serializeDictionaries( + () -> stringIterator, + () -> longIterator, + () -> doubleIterator + ); + stringCardinality = stringIterator.getCardinality(); + longCardinality = longIterator.getCardinality(); + doubleCardinality = doubleIterator.getCardinality(); + } + + log.debug( + "Completed dim[%s] conversions with string cardinality[%,d], long cardinality[%,d], double cardinality[%,d] in %,d millis.", + name, + stringCardinality, + longCardinality, + doubleCardinality, + System.currentTimeMillis() - dimStartTime + ); + } + catch (IOException ioe) { + log.error(ioe, "Failed to merge dictionary for column [%s]", name); + throw ioe; + } + } + + @Override + public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues( + int segmentIndex, + ColumnValueSelector source + ) + { + return source; + } + + @Override + public void processMergedRow(ColumnValueSelector selector) throws IOException + { + serializer.serialize(selector); + } + + @Override + public void writeIndexes(@Nullable List segmentRowNumConversions) + { + // fields write their own indexes + } + + @Override + public boolean hasOnlyNulls() + { + return false; + } + + @Override + public ColumnDescriptor makeColumnDescriptor() + { + return descriptorBuilder.build(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java new file mode 100644 index 000000000000..7b3e79c1dd5b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; + +import javax.annotation.Nullable; +import java.util.Objects; + +public class NestedDataColumnSchema extends DimensionSchema +{ + final int formatVersion; + + @JsonCreator + public NestedDataColumnSchema( + @JsonProperty("name") String name, + @JsonProperty("formatVersion") @Nullable Integer version, + @JacksonInject DefaultColumnFormatConfig defaultFormatConfig + ) + { + super(name, null, true); + if (version != null) { + formatVersion = version; + } else if (defaultFormatConfig.getNestedColumnFormatVersion() != null) { + formatVersion = defaultFormatConfig.getNestedColumnFormatVersion(); + } else { + // this is sort of a lie... it's not really v5 in the segment, rather its v0 of the 'nested common format' + // but as far as this is concerned it is v5 + formatVersion = 5; + } + } + + public NestedDataColumnSchema( + String name, + int version + ) + { + super(name, null, true); + this.formatVersion = version; + } + + @JsonProperty("formatVersion") + public int getFormatVersion() + { + return formatVersion; + } + + @Override + public String getTypeName() + { + return NestedDataComplexTypeSerde.TYPE_NAME; + } + + @Override + public ColumnType getColumnType() + { + return ColumnType.NESTED_DATA; + } + + @Override + public DimensionHandler getDimensionHandler() + { + if (formatVersion == 4) { + return new NestedDataColumnHandlerV4(getName()); + } else { + return new NestedCommonFormatColumnHandler(getName()); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + NestedDataColumnSchema that = (NestedDataColumnSchema) o; + return Objects.equals(formatVersion, that.formatVersion); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), formatVersion); + } + + @Override + public String toString() + { + return "NestedDataColumnSchema{" + + "formatVersion=" + formatVersion + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java index aaaedac580fb..a61ff334bb9e 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java @@ -27,6 +27,7 @@ import org.apache.druid.segment.IndexableAdapter; import org.apache.druid.segment.IntIteratorUtils; import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.NestedDataColumnIndexerV4; import org.apache.druid.segment.TransformableRowIterator; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnFormat; @@ -150,6 +151,18 @@ public NestedColumnMergable getNestedColumnMergeables(String column) } final DimensionIndexer indexer = accessor.dimensionDesc.getIndexer(); + if (indexer instanceof NestedDataColumnIndexerV4) { + NestedDataColumnIndexerV4 nestedDataColumnIndexer = (NestedDataColumnIndexerV4) indexer; + + return new NestedColumnMergable( + nestedDataColumnIndexer.getSortedValueLookups(), + nestedDataColumnIndexer.getFieldTypeInfo(), + true, + false, + null + ); + } + if (indexer instanceof AutoTypeColumnIndexer) { AutoTypeColumnIndexer autoIndexer = (AutoTypeColumnIndexer) indexer; return new NestedColumnMergable( diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 9ff678ee10d6..565ec3294fba 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -34,11 +34,13 @@ import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionIndexer; import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.NestedDataColumnIndexerV4; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ListIndexed; import org.apache.druid.segment.filter.BooleanValueMatcher; @@ -207,6 +209,13 @@ public Comparable getMaxValue(String column) @Override public ColumnCapabilities getColumnCapabilities(String column) { + IncrementalIndex.DimensionDesc desc = index.getDimension(column); + // nested column indexer is a liar, and behaves like any type if it only processes unnested literals of a single + // type, so force it to use nested column type + if (desc != null && desc.getIndexer() instanceof NestedDataColumnIndexerV4) { + return ColumnCapabilitiesImpl.createDefault().setType(ColumnType.NESTED_DATA); + } + // Different from index.getColumnCapabilities because, in a way, IncrementalIndex's string-typed dimensions // are always potentially multi-valued at query time. (Missing / null values for a row can potentially be // represented by an empty array; see StringDimensionIndexer.IndexerDimensionSelector's getRow method.) diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java new file mode 100644 index 000000000000..f10efd2d2413 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.nested; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.druid.collections.bitmap.ImmutableBitmap; +import org.apache.druid.collections.bitmap.MutableBitmap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.StringEncodingStrategies; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.ByteBufferWriter; +import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.data.DictionaryWriter; +import org.apache.druid.segment.data.FixedIndexedWriter; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.GenericIndexedWriter; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +public class NestedDataColumnSerializerV4 implements GenericColumnSerializer +{ + private static final Logger log = new Logger(NestedDataColumnSerializerV4.class); + public static final String STRING_DICTIONARY_FILE_NAME = "__stringDictionary"; + public static final String LONG_DICTIONARY_FILE_NAME = "__longDictionary"; + public static final String DOUBLE_DICTIONARY_FILE_NAME = "__doubleDictionary"; + public static final String RAW_FILE_NAME = "__raw"; + public static final String NULL_BITMAP_FILE_NAME = "__nullIndex"; + public static final String NESTED_FIELD_PREFIX = "__field_"; + + private final String name; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final IndexSpec indexSpec; + @SuppressWarnings("unused") + private final Closer closer; + + private final StructuredDataProcessor fieldProcessor = new StructuredDataProcessor() + { + @Override + public ProcessedValue processField(ArrayList fieldPath, @Nullable Object fieldValue) + { + final GlobalDictionaryEncodedFieldColumnWriter writer = fieldWriters.get( + NestedPathFinder.toNormalizedJsonPath(fieldPath) + ); + if (writer != null) { + try { + final ExprEval eval = ExprEval.bestEffortOf(fieldValue); + if (eval.type().isPrimitive() || eval.type().isPrimitiveArray()) { + writer.addValue(rowCount, eval.value()); + } else { + // behave consistently with nested column indexer, which defaults to string + writer.addValue(rowCount, eval.asString()); + } + // serializer doesn't use size estimate + return ProcessedValue.NULL_LITERAL; + } + catch (IOException e) { + throw new RE(e, "Failed to write field [%s], unhandled value", fieldPath); + } + } + return ProcessedValue.NULL_LITERAL; + } + + @Nullable + @Override + public ProcessedValue processArrayField( + ArrayList fieldPath, + @Nullable List array + ) + { + // classic nested column ingestion does not support array fields + return null; + } + }; + + private byte[] metadataBytes; + private DictionaryIdLookup globalDictionaryIdLookup; + private SortedMap fields; + private GenericIndexedWriter fieldsWriter; + private FieldTypeInfo.Writer fieldsInfoWriter; + private DictionaryWriter dictionaryWriter; + private FixedIndexedWriter longDictionaryWriter; + private FixedIndexedWriter doubleDictionaryWriter; + private CompressedVariableSizedBlobColumnSerializer rawWriter; + private ByteBufferWriter nullBitmapWriter; + private MutableBitmap nullRowsBitmap; + private Map> fieldWriters; + private int rowCount = 0; + private boolean closedForWrite = false; + + private boolean dictionarySerialized = false; + + public NestedDataColumnSerializerV4( + String name, + IndexSpec indexSpec, + SegmentWriteOutMedium segmentWriteOutMedium, + Closer closer + ) + { + this.name = name; + this.segmentWriteOutMedium = segmentWriteOutMedium; + this.indexSpec = indexSpec; + this.closer = closer; + this.globalDictionaryIdLookup = new DictionaryIdLookup(); + } + + @Override + public void open() throws IOException + { + fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY); + fieldsWriter.open(); + + fieldsInfoWriter = new FieldTypeInfo.Writer(segmentWriteOutMedium); + fieldsInfoWriter.open(); + + dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter( + indexSpec.getStringDictionaryEncoding(), + segmentWriteOutMedium, + name + ); + dictionaryWriter.open(); + + longDictionaryWriter = new FixedIndexedWriter<>( + segmentWriteOutMedium, + ColumnType.LONG.getStrategy(), + ByteOrder.nativeOrder(), + Long.BYTES, + true + ); + longDictionaryWriter.open(); + + doubleDictionaryWriter = new FixedIndexedWriter<>( + segmentWriteOutMedium, + ColumnType.DOUBLE.getStrategy(), + ByteOrder.nativeOrder(), + Double.BYTES, + true + ); + doubleDictionaryWriter.open(); + + rawWriter = new CompressedVariableSizedBlobColumnSerializer( + getInternalFileName(name, RAW_FILE_NAME), + segmentWriteOutMedium, + indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4 + ); + rawWriter.open(); + + nullBitmapWriter = new ByteBufferWriter<>( + segmentWriteOutMedium, + indexSpec.getBitmapSerdeFactory().getObjectStrategy() + ); + nullBitmapWriter.open(); + + nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap(); + } + + public void serializeFields(SortedMap fields) throws IOException + { + this.fields = fields; + this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size()); + int ctr = 0; + for (Map.Entry field : fields.entrySet()) { + final String fieldName = field.getKey(); + final String fieldFileName = NESTED_FIELD_PREFIX + ctr++; + fieldsWriter.write(fieldName); + fieldsInfoWriter.write(field.getValue()); + final GlobalDictionaryEncodedFieldColumnWriter writer; + final ColumnType type = field.getValue().getSingleType(); + if (type != null) { + if (Types.is(type, ValueType.STRING)) { + writer = new ScalarStringFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } else if (Types.is(type, ValueType.LONG)) { + writer = new ScalarLongFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } else if (Types.is(type, ValueType.DOUBLE)) { + writer = new ScalarDoubleFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } else { + throw new ISE("Invalid field type [%s], how did this happen?", type); + } + } else { + writer = new VariantFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } + writer.open(); + fieldWriters.put(fieldName, writer); + } + } + + public void serializeDictionaries( + Iterable strings, + Iterable longs, + Iterable doubles + ) throws IOException + { + if (dictionarySerialized) { + throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); + } + + // null is always 0 + dictionaryWriter.write(null); + globalDictionaryIdLookup.addString(null); + for (String value : strings) { + value = NullHandling.emptyToNullIfNeeded(value); + if (value == null) { + continue; + } + + dictionaryWriter.write(value); + globalDictionaryIdLookup.addString(value); + } + dictionarySerialized = true; + + for (Long value : longs) { + if (value == null) { + continue; + } + longDictionaryWriter.write(value); + globalDictionaryIdLookup.addLong(value); + } + + for (Double value : doubles) { + if (value == null) { + continue; + } + doubleDictionaryWriter.write(value); + globalDictionaryIdLookup.addDouble(value); + } + dictionarySerialized = true; + } + + @Override + public void serialize(ColumnValueSelector selector) throws IOException + { + final StructuredData data = StructuredData.wrap(selector.getObject()); + if (data == null) { + nullRowsBitmap.add(rowCount); + } + rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data)); + if (data != null) { + fieldProcessor.processFields(data.getValue()); + } + rowCount++; + } + + @Override + public long getSerializedSize() throws IOException + { + if (!closedForWrite) { + closedForWrite = true; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IndexMerger.SERIALIZER_UTILS.writeString( + baos, + NestedDataComplexTypeSerde.OBJECT_MAPPER.writeValueAsString( + new NestedDataColumnMetadata( + ByteOrder.nativeOrder(), + indexSpec.getBitmapSerdeFactory(), + name, + !nullRowsBitmap.isEmpty() + ) + ) + ); + this.metadataBytes = baos.toByteArray(); + this.nullBitmapWriter.write(nullRowsBitmap); + } + + long size = 1; + size += metadataBytes.length; + if (fieldsWriter != null) { + size += fieldsWriter.getSerializedSize(); + } + if (fieldsInfoWriter != null) { + size += fieldsInfoWriter.getSerializedSize(); + } + // the value dictionaries, raw column, and null index are all stored in separate files + return size; + } + + @Override + public void writeTo( + WritableByteChannel channel, + FileSmoosher smoosher + ) throws IOException + { + Preconditions.checkState(closedForWrite, "Not closed yet!"); + Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?"); + // version 5 + channel.write(ByteBuffer.wrap(new byte[]{0x04})); + channel.write(ByteBuffer.wrap(metadataBytes)); + fieldsWriter.writeTo(channel, smoosher); + fieldsInfoWriter.writeTo(channel, smoosher); + + // version 3 stores large components in separate files to prevent exceeding smoosh file limit (int max) + writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME); + writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME); + writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME); + writeInternal(smoosher, rawWriter, RAW_FILE_NAME); + if (!nullRowsBitmap.isEmpty()) { + writeInternal(smoosher, nullBitmapWriter, NULL_BITMAP_FILE_NAME); + } + + + // close the SmooshedWriter since we are done here, so we don't write to a temporary file per sub-column + // In the future, it would be best if the writeTo() itself didn't take a channel but was expected to actually + // open its own channels on the FileSmoosher object itself. Or some other thing that give this Serializer + // total control over when resources are opened up and when they are closed. Until then, we are stuck + // with a very tight coupling of this code with how the external "driver" is working. + if (channel instanceof SmooshedWriter) { + channel.close(); + } + + for (Map.Entry field : fields.entrySet()) { + // remove writer so that it can be collected when we are done with it + GlobalDictionaryEncodedFieldColumnWriter writer = fieldWriters.remove(field.getKey()); + writer.writeTo(rowCount, smoosher); + } + log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size()); + } + + private void writeInternal(FileSmoosher smoosher, Serializer serializer, String fileName) throws IOException + { + final String internalName = getInternalFileName(name, fileName); + try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) { + serializer.writeTo(smooshChannel, smoosher); + } + } + + public static String getInternalFileName(String fileNameBase, String field) + { + return StringUtils.format("%s.%s", fileNameBase, field); + } +} diff --git a/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java b/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java new file mode 100644 index 000000000000..8cc7e4cda85c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.segment.DefaultColumnFormatConfig; +import org.apache.druid.segment.DimensionHandlerProvider; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.NestedCommonFormatColumnHandler; +import org.apache.druid.segment.NestedDataColumnHandlerV4; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Properties; + +public class NestedDataModuleTest +{ + @Nullable + private static DimensionHandlerProvider DEFAULT_HANDLER_PROVIDER; + + @BeforeClass + public static void setup() + { + DEFAULT_HANDLER_PROVIDER = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( + NestedDataComplexTypeSerde.TYPE_NAME + ); + } + @AfterClass + public static void restore() + { + if (DEFAULT_HANDLER_PROVIDER == null) { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + } else { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.put( + NestedDataComplexTypeSerde.TYPE_NAME, + DEFAULT_HANDLER_PROVIDER + ); + } + } + + @Test + public void testDefaults() + { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + Properties props = new Properties(); + Injector gadget = makeInjector(props); + + // side effects + gadget.getInstance(NestedDataModule.SideEffectHandlerRegisterer.class); + + DimensionHandlerProvider provider = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( + NestedDataComplexTypeSerde.TYPE_NAME + ); + Assert.assertTrue(provider.get("test") instanceof NestedCommonFormatColumnHandler); + } + + @Test + public void testOverride() + { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + Properties props = new Properties(); + props.put("druid.indexing.formats.nestedColumnFormatVersion", "4"); + Injector gadget = makeInjector(props); + + // side effects + gadget.getInstance(NestedDataModule.SideEffectHandlerRegisterer.class); + + DimensionHandlerProvider provider = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( + NestedDataComplexTypeSerde.TYPE_NAME + ); + Assert.assertTrue(provider.get("test") instanceof NestedDataColumnHandlerV4); + } + + private Injector makeInjector(Properties props) + { + + StartupInjectorBuilder bob = new StartupInjectorBuilder().forTests().withProperties(props); + + bob.addAll( + ImmutableList.of( + binder -> { + JsonConfigProvider.bind(binder, "druid.indexing.formats", DefaultColumnFormatConfig.class); + }, + new NestedDataModule() + ) + ); + + return bob.build(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java index 2084af4b6a91..e5de63437358 100644 --- a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java +++ b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java @@ -44,6 +44,7 @@ import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.NestedDataColumnSchema; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; @@ -113,6 +114,20 @@ public class NestedDataTestUtils ) ) .build(); + + public static final DimensionsSpec TSV_V4_SCHEMA = + DimensionsSpec.builder() + .setDimensions( + Arrays.asList( + new NestedDataColumnSchema("dim", 4), + new NestedDataColumnSchema("nest_json", 4), + new NestedDataColumnSchema("nester_json", 4), + new NestedDataColumnSchema("variant_json", 4), + new NestedDataColumnSchema("list_json", 4), + new NestedDataColumnSchema("nonexistent", 4) + ) + ) + .build(); public static final InputRowSchema AUTO_SCHEMA = new InputRowSchema( TIMESTAMP_SPEC, AUTO_DISCOVERY, @@ -169,6 +184,21 @@ public static List createSimpleSegmentsTsv( ); } + public static List createSimpleSegmentsTsvV4( + TemporaryFolder tempFolder, + Closer closer + ) + throws Exception + { + return createSimpleNestedTestDataTsvSegments( + tempFolder, + closer, + Granularities.NONE, + TSV_V4_SCHEMA, + true + ); + } + public static List createSimpleNestedTestDataTsvSegments( TemporaryFolder tempFolder, Closer closer, diff --git a/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java index d2666ee32ff8..0d91e7d5e001 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java @@ -265,6 +265,36 @@ public void testIngestAndScanSegmentsRealtimeWithFallback() throws Exception Assert.assertEquals(resultsSegments.get(0).getEvents().toString(), resultsRealtime.get(0).getEvents().toString()); } + @Test + public void testIngestAndScanSegmentsTsvV4() throws Exception + { + Query scanQuery = Druids.newScanQueryBuilder() + .dataSource("test_datasource") + .intervals( + new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.ETERNITY) + ) + ) + .virtualColumns( + new NestedFieldVirtualColumn("nest", "$.x", "x"), + new NestedFieldVirtualColumn("nester", "$.x[0]", "x_0"), + new NestedFieldVirtualColumn("nester", "$.y.c[1]", "y_c_1") + ) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .limit(100) + .context(ImmutableMap.of()) + .build(); + List segs = NestedDataTestUtils.createSimpleSegmentsTsvV4(tempFolder, closer); + + final Sequence seq = helper.runQueryOnSegmentsObjs(segs, scanQuery); + + List results = seq.toList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals(8, ((List) results.get(0).getEvents()).size()); + logResults(results); + } + + @Test public void testIngestAndScanSegmentsTsv() throws Exception { diff --git a/processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java b/processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java new file mode 100644 index 000000000000..f259fc1dc237 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class DefaultColumnFormatsConfigTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testDefaultsSerde() throws JsonProcessingException + { + DefaultColumnFormatConfig defaultColumnFormatConfig = new DefaultColumnFormatConfig(null); + String there = MAPPER.writeValueAsString(defaultColumnFormatConfig); + DefaultColumnFormatConfig andBack = MAPPER.readValue(there, DefaultColumnFormatConfig.class); + Assert.assertEquals(defaultColumnFormatConfig, andBack); + Assert.assertNull(andBack.getNestedColumnFormatVersion()); + } + + @Test + public void testDefaultsSerdeOverride() throws JsonProcessingException + { + DefaultColumnFormatConfig defaultColumnFormatConfig = new DefaultColumnFormatConfig(4); + String there = MAPPER.writeValueAsString(defaultColumnFormatConfig); + DefaultColumnFormatConfig andBack = MAPPER.readValue(there, DefaultColumnFormatConfig.class); + Assert.assertEquals(defaultColumnFormatConfig, andBack); + Assert.assertEquals(4, (int) andBack.getNestedColumnFormatVersion()); + } + + @Test + public void testEqualsAndHashcode() + { + EqualsVerifier.forClass(DefaultColumnFormatConfig.class).usingGetClass().verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java new file mode 100644 index 000000000000..b43616b4a4a8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.druid.segment.nested.StructuredData; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.List; +import java.util.Map; + +public class NestedDataColumnIndexerV4Test extends InitializedNullHandlingTest +{ + private static final String TIME_COL = "time"; + private static final String STRING_COL = "string"; + private static final String STRING_ARRAY_COL = "string_array"; + private static final String LONG_COL = "long"; + private static final String DOUBLE_COL = "double"; + private static final String VARIANT_COL = "variant"; + private static final String NESTED_COL = "nested"; + + @BeforeClass + public static void setup() + { + NestedDataModule.registerHandlersAndSerde(); + } + + @Test + public void testKeySizeEstimation() + { + NestedDataColumnIndexerV4 indexer = new NestedDataColumnIndexerV4(); + int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2; + Assert.assertEquals(baseCardinality, indexer.getCardinality()); + + EncodedKeyComponent key; + // new raw value, new field, new dictionary entry + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false); + Assert.assertEquals(228, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 1, indexer.getCardinality()); + // adding same value only adds estimated size of value itself + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false); + Assert.assertEquals(112, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 1, indexer.getCardinality()); + // new raw value, new field, new dictionary entry + key = indexer.processRowValsToUnsortedEncodedKeyComponent(10L, false); + Assert.assertEquals(94, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 2, indexer.getCardinality()); + // adding same value only adds estimated size of value itself + key = indexer.processRowValsToUnsortedEncodedKeyComponent(10L, false); + Assert.assertEquals(16, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 2, indexer.getCardinality()); + // new raw value, new dictionary entry + key = indexer.processRowValsToUnsortedEncodedKeyComponent(11L, false); + Assert.assertEquals(48, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 3, indexer.getCardinality()); + + // new raw value, new fields + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 10L), false); + Assert.assertEquals(276, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + // new raw value, re-use fields and dictionary + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 10L), false); + Assert.assertEquals(56, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + // new raw value, new fields + key = indexer.processRowValsToUnsortedEncodedKeyComponent( + ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), + false + ); + Assert.assertEquals(286, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + // new raw value + key = indexer.processRowValsToUnsortedEncodedKeyComponent( + ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), + false + ); + Assert.assertEquals(118, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + + key = indexer.processRowValsToUnsortedEncodedKeyComponent("", false); + if (NullHandling.replaceWithDefault()) { + Assert.assertEquals(0, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 6, indexer.getCardinality()); + } else { + Assert.assertEquals(104, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 6, indexer.getCardinality()); + } + + key = indexer.processRowValsToUnsortedEncodedKeyComponent(0, false); + if (NullHandling.replaceWithDefault()) { + Assert.assertEquals(16, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 6, indexer.getCardinality()); + } else { + Assert.assertEquals(48, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 7, indexer.getCardinality()); + } + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootString() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, STRING_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, STRING_COL, "b")); + index.add(makeInputRow(minTimestamp + 3, true, STRING_COL, "c")); + index.add(makeInputRow(minTimestamp + 4, true, STRING_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, STRING_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_COL, STRING_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("a", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("a", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("a", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("b", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("b", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("b", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("c", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("c", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("c", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootLong() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, LONG_COL, 1L)); + index.add(makeInputRow(minTimestamp + 2, true, LONG_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, LONG_COL, 3L)); + index.add(makeInputRow(minTimestamp + 4, true, LONG_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, LONG_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(LONG_COL, LONG_COL, ColumnType.LONG); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(1L, valueSelector.getObject()); + Assert.assertEquals(1L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("1", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2L, valueSelector.getObject()); + Assert.assertEquals(2L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3L, valueSelector.getObject()); + Assert.assertEquals(3L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultLongValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultLongValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), dimensionSelector.getObject()); + } + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultLongValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultLongValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), dimensionSelector.getObject()); + } + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootDouble() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, DOUBLE_COL, 1.1)); + index.add(makeInputRow(minTimestamp + 2, true, DOUBLE_COL, 2.2)); + index.add(makeInputRow(minTimestamp + 3, true, DOUBLE_COL, 3.3)); + index.add(makeInputRow(minTimestamp + 4, true, DOUBLE_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, DOUBLE_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(DOUBLE_COL, DOUBLE_COL, ColumnType.DOUBLE); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(1.1, valueSelector.getObject()); + Assert.assertEquals(1.1, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("1.1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("1.1", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2.2, valueSelector.getObject()); + Assert.assertEquals(2.2, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("2.2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("2.2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3.3, valueSelector.getObject()); + Assert.assertEquals(3.3, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("3.3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("3.3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultDoubleValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultDoubleValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), dimensionSelector.getObject()); + } + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultDoubleValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultDoubleValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), dimensionSelector.getObject()); + } + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootStringArray() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, STRING_ARRAY_COL, new String[]{"a"})); + index.add(makeInputRow(minTimestamp + 2, true, STRING_ARRAY_COL, new Object[]{"b", "c"})); + index.add(makeInputRow(minTimestamp + 3, true, STRING_ARRAY_COL, ImmutableList.of("d", "e"))); + index.add(makeInputRow(minTimestamp + 4, true, STRING_ARRAY_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, STRING_ARRAY_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_ARRAY_COL, STRING_ARRAY_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertArrayEquals(new Object[]{"a"}, (Object[]) valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertArrayEquals(new Object[]{"b", "c"}, (Object[]) valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertArrayEquals(new Object[]{"d", "e"}, (Object[]) valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootVariant() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, VARIANT_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, VARIANT_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, VARIANT_COL, 3.3)); + index.add(makeInputRow(minTimestamp + 4, true, VARIANT_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, VARIANT_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(VARIANT_COL, VARIANT_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + DimensionSelector dimensionSelector = cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertEquals("a", valueSelector.getObject()); + Assert.assertEquals("a", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2L, valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals("2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3.3, valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals("3.3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryNested() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, NESTED_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, NESTED_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, NESTED_COL, ImmutableMap.of("x", 1.1, "y", 2L))); + index.add(makeInputRow(minTimestamp + 4, true, NESTED_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, NESTED_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(NESTED_COL, NESTED_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap("a"), valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(2L), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(ImmutableMap.of("x", 1.1, "y", 2L)), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Nonnull + private static IncrementalIndex makeIncrementalIndex(long minTimestamp) + { + IncrementalIndex index = new OnheapIncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema( + minTimestamp, + new TimestampSpec(TIME_COL, "millis", null), + Granularities.NONE, + VirtualColumns.EMPTY, + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + new AggregatorFactory[0], + false + ) + ) + .setMaxRowCount(1000) + .build(); + return index; + } + + private MapBasedInputRow makeInputRow( + long timestamp, + boolean explicitNull, + Object... kv + ) + { + final Map event = TestHelper.makeMap(explicitNull, kv); + event.put("time", timestamp); + return new MapBasedInputRow(timestamp, ImmutableList.copyOf(event.keySet()), event); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java new file mode 100644 index 000000000000..3270ce155b12 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class NestedDataColumnSchemaTest +{ + private static final DefaultColumnFormatConfig DEFAULT_CONFIG = new DefaultColumnFormatConfig(null); + private static final DefaultColumnFormatConfig DEFAULT_CONFIG_V4 = new DefaultColumnFormatConfig(4); + private static final ObjectMapper MAPPER; + private static final ObjectMapper MAPPER_V4; + static { + MAPPER = new DefaultObjectMapper(); + MAPPER.setInjectableValues( + new InjectableValues.Std().addValue( + DefaultColumnFormatConfig.class, + DEFAULT_CONFIG + ) + ); + + MAPPER_V4 = new DefaultObjectMapper(); + MAPPER_V4.setInjectableValues( + new InjectableValues.Std().addValue( + DefaultColumnFormatConfig.class, + DEFAULT_CONFIG_V4 + ) + ); + } + + @Test + public void testSerdeRoundTrip() throws JsonProcessingException + { + final NestedDataColumnSchema v4 = new NestedDataColumnSchema("test", 4); + final NestedDataColumnSchema v5 = new NestedDataColumnSchema("test", 5); + Assert.assertEquals(v4, MAPPER.readValue(MAPPER.writeValueAsString(v4), NestedDataColumnSchema.class)); + Assert.assertEquals(v5, MAPPER.readValue(MAPPER.writeValueAsString(v5), NestedDataColumnSchema.class)); + } + + @Test + public void testSerdeDefault() throws JsonProcessingException + { + final String there = "{\"type\":\"json\", \"name\":\"test\"}"; + NestedDataColumnSchema andBack = MAPPER.readValue(there, NestedDataColumnSchema.class); + Assert.assertEquals(new NestedDataColumnSchema("test", 5), andBack); + } + + @Test + public void testSerdeSystemDefault() throws JsonProcessingException + { + final String there = "{\"type\":\"json\", \"name\":\"test\"}"; + NestedDataColumnSchema andBack = MAPPER_V4.readValue(there, NestedDataColumnSchema.class); + Assert.assertEquals(new NestedDataColumnSchema("test", 4), andBack); + } + + @Test + public void testSerdeOverride() throws JsonProcessingException + { + final String there = "{\"type\":\"json\", \"name\":\"test\",\"formatVersion\":4}"; + NestedDataColumnSchema andBack = MAPPER.readValue(there, NestedDataColumnSchema.class); + Assert.assertEquals(new NestedDataColumnSchema("test", 4), andBack); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java index 3f5a6ef337f4..cb1a8270af7a 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java @@ -22,28 +22,49 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.query.DefaultBitmapResultFactory; import org.apache.druid.query.filter.SelectorPredicateFactory; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.IndexableAdapter; +import org.apache.druid.segment.NestedDataColumnIndexerV4; +import org.apache.druid.segment.ObjectColumnSelector; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.index.BitmapColumnIndex; import org.apache.druid.segment.index.semantic.DruidPredicateIndexes; import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.StringValueSetIndexes; +import org.apache.druid.segment.serde.ColumnPartSerde; +import org.apache.druid.segment.serde.ComplexColumnPartSerde; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.utils.CompressionUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -52,22 +73,26 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest { private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); - private static final String NO_MATCH = "no"; - @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - DefaultBitmapResultFactory resultFactory = new DefaultBitmapResultFactory(new RoaringBitmapFactory()); - List> data = ImmutableList.of( TestHelper.makeMap("x", 1L, "y", 1.0, "z", "a", "v", "100", "nullish", "notnull"), TestHelper.makeMap("y", 3.0, "z", "d", "v", 1000L, "nullish", null), @@ -77,20 +102,178 @@ public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest TestHelper.makeMap("x", 4L, "y", 2.0, "z", "e", "v", 11111L, "nullish", null) ); + List> arrayTestData = ImmutableList.of( + TestHelper.makeMap("s", new Object[]{"a", "b", "c"}, "l", new Object[]{1L, 2L, 3L}, "d", new Object[]{1.1, 2.2}), + TestHelper.makeMap( + "s", + new Object[]{null, "b", "c"}, + "l", + new Object[]{1L, null, 3L}, + "d", + new Object[]{2.2, 2.2} + ), + TestHelper.makeMap( + "s", + new Object[]{"b", "c"}, + "l", + new Object[]{null, null}, + "d", + new Object[]{1.1, null, 2.2} + ), + TestHelper.makeMap("s", new Object[]{"a", "b", "c", "d"}, "l", new Object[]{4L, 2L, 3L}), + TestHelper.makeMap("s", new Object[]{"d", "b", "c", "a"}, "d", new Object[]{1.1, 2.2}), + TestHelper.makeMap("l", new Object[]{1L, 2L, 3L}, "d", new Object[]{3.1, 2.2, 1.9}) + ); + Closer closer = Closer.create(); + SmooshedFileMapper fileMapper; + + ByteBuffer baseBuffer; + + SmooshedFileMapper arrayFileMapper; + + ByteBuffer arrayBaseBuffer; + @BeforeClass public static void staticSetup() { NestedDataModule.registerHandlersAndSerde(); } + @Before + public void setup() throws IOException + { + final String fileNameBase = "test"; + final String arrayFileNameBase = "array"; + fileMapper = smooshify(fileNameBase, tempFolder.newFolder(), data); + baseBuffer = fileMapper.mapFile(fileNameBase); + arrayFileMapper = smooshify(arrayFileNameBase, tempFolder.newFolder(), arrayTestData); + arrayBaseBuffer = arrayFileMapper.mapFile(arrayFileNameBase); + } + + private SmooshedFileMapper smooshify( + String fileNameBase, + File tmpFile, + List> data + ) + throws IOException + { + SegmentWriteOutMediumFactory writeOutMediumFactory = TmpFileSegmentWriteOutMediumFactory.instance(); + try (final FileSmoosher smoosher = new FileSmoosher(tmpFile)) { + NestedDataColumnSerializerV4 serializer = new NestedDataColumnSerializerV4( + fileNameBase, + IndexSpec.DEFAULT, + writeOutMediumFactory.makeSegmentWriteOutMedium(tempFolder.newFolder()), + closer + ); + + NestedDataColumnIndexerV4 indexer = new NestedDataColumnIndexerV4(); + for (Object o : data) { + indexer.processRowValsToUnsortedEncodedKeyComponent(o, false); + } + SortedMap sortedFields = new TreeMap<>(); + + IndexableAdapter.NestedColumnMergable mergable = closer.register( + new IndexableAdapter.NestedColumnMergable( + indexer.getSortedValueLookups(), + indexer.getFieldTypeInfo(), + true, + false, + null + ) + ); + SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary(); + mergable.mergeFieldsInto(sortedFields); + + serializer.open(); + serializer.serializeFields(sortedFields); + serializer.serializeDictionaries( + globalDictionarySortedCollector.getSortedStrings(), + globalDictionarySortedCollector.getSortedLongs(), + globalDictionarySortedCollector.getSortedDoubles() + ); + + SettableSelector valueSelector = new SettableSelector(); + for (Object o : data) { + valueSelector.setObject(StructuredData.wrap(o)); + serializer.serialize(valueSelector); + } + + try (SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize())) { + serializer.writeTo(writer, smoosher); + } + smoosher.close(); + return closer.register(SmooshedFileMapper.load(tmpFile)); + } + } + @After public void teardown() throws IOException { closer.close(); } + @Test + public void testBasicFunctionality() throws IOException + { + ColumnBuilder bob = new ColumnBuilder(); + bob.setFileMapper(fileMapper); + ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.createDeserializer(NestedDataComplexTypeSerde.TYPE_NAME); + ColumnPartSerde.Deserializer deserializer = partSerde.getDeserializer(); + deserializer.read(baseBuffer, bob, NestedFieldColumnIndexSupplierTest.ALWAYS_USE_INDEXES); + final ColumnHolder holder = bob.build(); + final ColumnCapabilities capabilities = holder.getCapabilities(); + Assert.assertEquals(ColumnType.NESTED_DATA, capabilities.toColumnType()); + Assert.assertTrue(holder.getColumnFormat() instanceof NestedDataComplexTypeSerde.NestedColumnFormatV4); + try (NestedDataComplexColumn column = (NestedDataComplexColumn) holder.getColumn()) { + smokeTest(column); + } + } + + @Test + public void testConcurrency() throws ExecutionException, InterruptedException + { + // if this test ever starts being to be a flake, there might be thread safety issues + ColumnBuilder bob = new ColumnBuilder(); + bob.setFileMapper(fileMapper); + NestedDataColumnSupplierV4 supplier = NestedDataColumnSupplierV4.read( + baseBuffer, + bob, + NestedFieldColumnIndexSupplierTest.ALWAYS_USE_INDEXES, + NestedDataComplexTypeSerde.OBJECT_MAPPER + ); + final String expectedReason = "none"; + final AtomicReference failureReason = new AtomicReference<>(expectedReason); + + final int threads = 10; + ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d") + ); + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) { + smokeTest(column); + } + } + } + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + @Test public void testLegacyV3ReaderFormat() throws IOException { @@ -106,34 +289,27 @@ public void testLegacyV3ReaderFormat() throws IOException ColumnHolder holder = theIndex.getColumnHolder(columnName); Assert.assertNotNull(holder); Assert.assertEquals(ColumnType.NESTED_DATA, holder.getCapabilities().toColumnType()); - NestedDataColumnV3 v3 = closer.register((NestedDataColumnV3) holder.getColumn()); Assert.assertNotNull(v3); - List path = ImmutableList.of(new NestedPathField("lastName")); ColumnHolder nestedColumnHolder = v3.getColumnHolder(path); Assert.assertNotNull(nestedColumnHolder); Assert.assertEquals(ColumnType.STRING, nestedColumnHolder.getCapabilities().toColumnType()); NestedFieldDictionaryEncodedColumn nestedColumn = (NestedFieldDictionaryEncodedColumn) nestedColumnHolder.getColumn(); - Assert.assertNotNull(nestedColumn); - ColumnValueSelector selector = nestedColumn.makeColumnValueSelector( new SimpleAscendingOffset(theIndex.getNumRows()) ); - ColumnIndexSupplier indexSupplier = v3.getColumnIndexSupplier(path); Assert.assertNotNull(indexSupplier); StringValueSetIndexes valueSetIndex = indexSupplier.as(StringValueSetIndexes.class); Assert.assertNotNull(valueSetIndex); - BitmapColumnIndex indexForValue = valueSetIndex.forValue(firstValue); Assert.assertEquals(firstValue, selector.getObject()); Assert.assertTrue(indexForValue.computeBitmapResult(resultFactory).get(0)); } } - @Test public void testLegacyV4ReaderFormat() throws IOException { @@ -150,39 +326,31 @@ public void testLegacyV4ReaderFormat() throws IOException ColumnHolder holder = theIndex.getColumnHolder(columnName); Assert.assertNotNull(holder); Assert.assertEquals(ColumnType.NESTED_DATA, holder.getCapabilities().toColumnType()); - NestedDataColumnV4 v4 = closer.register((NestedDataColumnV4) holder.getColumn()); Assert.assertNotNull(v4); - List path = ImmutableList.of(new NestedPathField("lastName")); ColumnHolder nestedColumnHolder = v4.getColumnHolder(path); Assert.assertNotNull(nestedColumnHolder); Assert.assertEquals(ColumnType.STRING, nestedColumnHolder.getCapabilities().toColumnType()); NestedFieldDictionaryEncodedColumn nestedColumn = (NestedFieldDictionaryEncodedColumn) nestedColumnHolder.getColumn(); - Assert.assertNotNull(nestedColumn); - ColumnValueSelector selector = nestedColumn.makeColumnValueSelector( new SimpleAscendingOffset(theIndex.getNumRows()) ); - ColumnIndexSupplier indexSupplier = v4.getColumnIndexSupplier(path); Assert.assertNotNull(indexSupplier); StringValueSetIndexes valueSetIndex = indexSupplier.as(StringValueSetIndexes.class); Assert.assertNotNull(valueSetIndex); - BitmapColumnIndex indexForValue = valueSetIndex.forValue(firstValue); Assert.assertEquals(firstValue, selector.getObject()); Assert.assertTrue(indexForValue.computeBitmapResult(resultFactory).get(0)); } } - private void smokeTest(NestedDataComplexColumn column) throws IOException { SimpleAscendingOffset offset = new SimpleAscendingOffset(data.size()); ColumnValueSelector rawSelector = column.makeColumnValueSelector(offset); - final List xPath = NestedPathFinder.parseJsonPath("$.x"); Assert.assertEquals(ImmutableSet.of(ColumnType.LONG), column.getColumnTypes(xPath)); Assert.assertEquals(ColumnType.LONG, column.getColumnHolder(xPath).getCapabilities().toColumnType()); @@ -193,7 +361,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes xValueIndex = xIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes xPredicateIndex = xIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex xNulls = xIndexSupplier.as(NullValueIndex.class); - final List yPath = NestedPathFinder.parseJsonPath("$.y"); Assert.assertEquals(ImmutableSet.of(ColumnType.DOUBLE), column.getColumnTypes(yPath)); Assert.assertEquals(ColumnType.DOUBLE, column.getColumnHolder(yPath).getCapabilities().toColumnType()); @@ -204,7 +371,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes yValueIndex = yIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes yPredicateIndex = yIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex yNulls = yIndexSupplier.as(NullValueIndex.class); - final List zPath = NestedPathFinder.parseJsonPath("$.z"); Assert.assertEquals(ImmutableSet.of(ColumnType.STRING), column.getColumnTypes(zPath)); Assert.assertEquals(ColumnType.STRING, column.getColumnHolder(zPath).getCapabilities().toColumnType()); @@ -215,7 +381,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes zValueIndex = zIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes zPredicateIndex = zIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex zNulls = zIndexSupplier.as(NullValueIndex.class); - final List vPath = NestedPathFinder.parseJsonPath("$.v"); Assert.assertEquals( ImmutableSet.of(ColumnType.STRING, ColumnType.LONG, ColumnType.DOUBLE), @@ -229,7 +394,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes vValueIndex = vIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes vPredicateIndex = vIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex vNulls = vIndexSupplier.as(NullValueIndex.class); - final List nullishPath = NestedPathFinder.parseJsonPath("$.nullish"); Assert.assertEquals(ImmutableSet.of(ColumnType.STRING), column.getColumnTypes(nullishPath)); Assert.assertEquals(ColumnType.STRING, column.getColumnHolder(nullishPath).getCapabilities().toColumnType()); @@ -240,16 +404,13 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes nullishValueIndex = nullishIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes nullishPredicateIndex = nullishIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex nullishNulls = nullishIndexSupplier.as(NullValueIndex.class); - Assert.assertEquals(ImmutableList.of(nullishPath, vPath, xPath, yPath, zPath), column.getNestedFields()); - for (int i = 0; i < data.size(); i++) { Map row = data.get(i); Assert.assertEquals( JSON_MAPPER.writeValueAsString(row), JSON_MAPPER.writeValueAsString(StructuredData.unwrap(rawSelector.getObject())) ); - testPath(row, i, "v", vSelector, vDimSelector, vValueIndex, vPredicateIndex, vNulls, null); testPath(row, i, "x", xSelector, xDimSelector, xValueIndex, xPredicateIndex, xNulls, ColumnType.LONG); testPath(row, i, "y", ySelector, yDimSelector, yValueIndex, yPredicateIndex, yNulls, ColumnType.DOUBLE); @@ -265,11 +426,9 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException nullishNulls, ColumnType.STRING ); - offset.increment(); } } - private void testPath( Map row, int rowNumber, @@ -287,7 +446,6 @@ private void testPath( // to take the null checking path final boolean isStringAndNullEquivalent = inputValue instanceof String && NullHandling.isNullOrEquivalent((String) inputValue); - if (row.containsKey(path) && inputValue != null && !isStringAndNullEquivalent) { Assert.assertEquals(inputValue, valueSelector.getObject()); if (ColumnType.LONG.equals(singleType)) { @@ -297,13 +455,11 @@ private void testPath( Assert.assertEquals((double) inputValue, valueSelector.getDouble(), 0.0); Assert.assertFalse(path + " is not null", valueSelector.isNull()); } - final String theString = String.valueOf(inputValue); Assert.assertEquals(theString, dimSelector.getObject()); String dimSelectorLookupVal = dimSelector.lookupName(dimSelector.getRow().get(0)); Assert.assertEquals(theString, dimSelectorLookupVal); Assert.assertEquals(dimSelector.idLookup().lookupId(dimSelectorLookupVal), dimSelector.getRow().get(0)); - Assert.assertTrue(valueSetIndex.forValue(theString).computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(theString))) .computeBitmapResult(resultFactory) @@ -319,7 +475,6 @@ private void testPath( .computeBitmapResult(resultFactory) .get(rowNumber)); Assert.assertFalse(nullValueIndex.get().computeBitmapResult(resultFactory).get(rowNumber)); - Assert.assertTrue(dimSelector.makeValueMatcher(theString).matches()); Assert.assertFalse(dimSelector.makeValueMatcher(NO_MATCH).matches()); Assert.assertTrue(dimSelector.makeValueMatcher(x -> Objects.equals(x, theString)).matches()); @@ -327,28 +482,104 @@ private void testPath( } else { Assert.assertNull(valueSelector.getObject()); Assert.assertTrue(path, valueSelector.isNull()); - Assert.assertEquals(0, dimSelector.getRow().get(0)); Assert.assertNull(dimSelector.getObject()); Assert.assertNull(dimSelector.lookupName(dimSelector.getRow().get(0))); - Assert.assertTrue(valueSetIndex.forValue(null).computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertTrue(nullValueIndex.get().computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertTrue(predicateIndex.forPredicate(new SelectorPredicateFactory(null)) .computeBitmapResult(resultFactory) .get(rowNumber)); Assert.assertFalse(valueSetIndex.forValue(NO_MATCH).computeBitmapResult(resultFactory).get(rowNumber)); - - Assert.assertFalse(valueSetIndex.forValue(NO_MATCH).computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertFalse(predicateIndex.forPredicate(new SelectorPredicateFactory(NO_MATCH)) .computeBitmapResult(resultFactory) .get(rowNumber)); - Assert.assertTrue(dimSelector.makeValueMatcher((String) null).matches()); Assert.assertFalse(dimSelector.makeValueMatcher(NO_MATCH).matches()); Assert.assertTrue(dimSelector.makeValueMatcher(x -> x == null).matches()); Assert.assertFalse(dimSelector.makeValueMatcher(x -> Objects.equals(x, NO_MATCH)).matches()); } } + + private static class SettableSelector extends ObjectColumnSelector + { + private StructuredData data; + + public void setObject(StructuredData o) + { + this.data = o; + } + + @Nullable + @Override + public StructuredData getObject() + { + return data; + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + } + + private static class OnlyPositionalReadsTypeStrategy implements TypeStrategy + { + private final TypeStrategy delegate; + + private OnlyPositionalReadsTypeStrategy(TypeStrategy delegate) + { + this.delegate = delegate; + } + + @Override + public int estimateSizeBytes(T value) + { + return delegate.estimateSizeBytes(value); + } + + @Override + public T read(ByteBuffer buffer) + { + throw new IllegalStateException("non-positional read"); + } + + @Override + public boolean readRetainsBufferReference() + { + return delegate.readRetainsBufferReference(); + } + + @Override + public int write(ByteBuffer buffer, T value, int maxSizeBytes) + { + return delegate.write(buffer, value, maxSizeBytes); + } + + @Override + public T read(ByteBuffer buffer, int offset) + { + return delegate.read(buffer, offset); + } + + @Override + public int write(ByteBuffer buffer, int offset, T value, int maxSizeBytes) + { + return delegate.write(buffer, offset, value, maxSizeBytes); + } + + @Override + public int compare(Object o1, Object o2) + { + return delegate.compare(o1, o2); + } + } } diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index 3ae5d2a3586f..e788949f66b9 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.segment.DefaultColumnFormatConfig; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocation; @@ -55,6 +56,7 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); + JsonConfigProvider.bind(binder, "druid.indexing.formats", DefaultColumnFormatConfig.class); bindLocationSelectorStrategy(binder); binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null)); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class).in(LazySingleton.class); From 9224d0fa4a769475801ab9f850b559222d89a55f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Sep 2023 16:45:38 -0700 Subject: [PATCH 2/8] test stuff --- .../org/apache/druid/segment/DefaultColumnFormatConfig.java | 2 +- .../org/apache/druid/sql/calcite/util/SqlTestFramework.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java index 696827f7c452..ea51ec19ba84 100644 --- a/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java +++ b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java @@ -67,7 +67,7 @@ public int hashCode() @Override public String toString() { - return "DefaultColumnFormatsConfig{" + + return "DefaultColumnFormatConfig{" + "nestedColumnFormatVersion=" + nestedColumnFormatVersion + '}'; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index 3467d71bb627..2895eaeb01e1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -39,6 +39,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.topn.TopNQueryConfig; +import org.apache.druid.segment.DefaultColumnFormatConfig; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; @@ -485,6 +486,7 @@ public void configure(Binder binder) { binder.bind(DruidOperatorTable.class).in(LazySingleton.class); binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT); + binder.bind(DefaultColumnFormatConfig.class).toInstance(new DefaultColumnFormatConfig(null)); } @Provides From bcdd43450c027eb2ac7df9eec5d5e895a4510acd Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Sep 2023 16:49:33 -0700 Subject: [PATCH 3/8] fix maybe --- .../test/java/org/apache/druid/guice/NestedDataModuleTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java b/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java index 8cc7e4cda85c..59b5b2df4158 100644 --- a/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java +++ b/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java @@ -46,9 +46,10 @@ public static void setup() DEFAULT_HANDLER_PROVIDER = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( NestedDataComplexTypeSerde.TYPE_NAME ); + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); } @AfterClass - public static void restore() + public static void teardown() { if (DEFAULT_HANDLER_PROVIDER == null) { DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); From 87dc8386019e08c426ef502fa68713f58f92e906 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Sep 2023 16:52:45 -0700 Subject: [PATCH 4/8] style --- .../org/apache/druid/segment/NestedDataColumnSchemaTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java index 3270ce155b12..fe232850e233 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java @@ -32,6 +32,7 @@ public class NestedDataColumnSchemaTest private static final DefaultColumnFormatConfig DEFAULT_CONFIG_V4 = new DefaultColumnFormatConfig(4); private static final ObjectMapper MAPPER; private static final ObjectMapper MAPPER_V4; + static { MAPPER = new DefaultObjectMapper(); MAPPER.setInjectableValues( From 6cae3d99b41781a3409fcbb8bd2fb3ceabbd30cd Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Sep 2023 18:45:28 -0700 Subject: [PATCH 5/8] named handler providers --- .../apache/druid/guice/NestedDataModule.java | 29 +++++++++++++++++-- .../segment/NestedDataColumnSchemaTest.java | 2 +- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java index 5de6f5ea65ea..247d83af81ed 100644 --- a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java +++ b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java @@ -27,6 +27,8 @@ import com.google.inject.Provides; import org.apache.druid.initialization.DruidModule; import org.apache.druid.segment.DefaultColumnFormatConfig; +import org.apache.druid.segment.DimensionHandler; +import org.apache.druid.segment.DimensionHandlerProvider; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.NestedCommonFormatColumnHandler; import org.apache.druid.segment.NestedDataColumnHandlerV4; @@ -63,12 +65,12 @@ public SideEffectHandlerRegisterer registerHandler(DefaultColumnFormatConfig for if (formatsConfig.getNestedColumnFormatVersion() != null && formatsConfig.getNestedColumnFormatVersion() == 4) { DimensionHandlerUtils.registerDimensionHandlerProvider( NestedDataComplexTypeSerde.TYPE_NAME, - NestedDataColumnHandlerV4::new + new NestedColumnV4HandlerProvider() ); } else { DimensionHandlerUtils.registerDimensionHandlerProvider( NestedDataComplexTypeSerde.TYPE_NAME, - NestedCommonFormatColumnHandler::new + new NestedCommonFormatHandlerProvider() ); } return new SideEffectHandlerRegisterer(); @@ -92,7 +94,7 @@ public static void registerHandlersAndSerde() registerSerde(); DimensionHandlerUtils.registerDimensionHandlerProvider( NestedDataComplexTypeSerde.TYPE_NAME, - NestedCommonFormatColumnHandler::new + new NestedCommonFormatHandlerProvider() ); } @@ -103,6 +105,27 @@ private static void registerSerde() } } + public static class NestedCommonFormatHandlerProvider + implements DimensionHandlerProvider + { + + @Override + public DimensionHandler get(String dimensionName) + { + return new NestedCommonFormatColumnHandler(dimensionName); + } + } + + public static class NestedColumnV4HandlerProvider + implements DimensionHandlerProvider + { + + @Override + public DimensionHandler get(String dimensionName) + { + return new NestedDataColumnHandlerV4(dimensionName); + } + } /** * this is used as a vehicle to register the correct version of the system default nested column handler by side * effect with the help of binding to {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} so that diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java index fe232850e233..476207a29461 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java @@ -32,7 +32,7 @@ public class NestedDataColumnSchemaTest private static final DefaultColumnFormatConfig DEFAULT_CONFIG_V4 = new DefaultColumnFormatConfig(4); private static final ObjectMapper MAPPER; private static final ObjectMapper MAPPER_V4; - + static { MAPPER = new DefaultObjectMapper(); MAPPER.setInjectableValues( From 7fca66d15a0d786021950decbf05d941b2c75125 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sat, 9 Sep 2023 19:30:48 -0700 Subject: [PATCH 6/8] fix test --- .../druid/indexing/overlord/sampler/SamplerResponseTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java index 4fd1090d172d..b8693b86dcd3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java @@ -71,7 +71,7 @@ public void testSerde() throws IOException data ) ); - String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"json\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; + String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"auto\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; Assert.assertEquals(expected, out); } From c02e361bdc09178fcf07cd859a010b24625c574b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 11 Sep 2023 14:01:59 -0700 Subject: [PATCH 7/8] fix test --- .../org/apache/druid/cli/DumpSegmentTest.java | 92 ++++++++----------- .../src/test/resources/nested-test-aggs.json | 6 -- .../test/resources/nested-test-parser.json | 20 ---- 3 files changed, 39 insertions(+), 79 deletions(-) delete mode 100644 services/src/test/resources/nested-test-aggs.json delete mode 100644 services/src/test/resources/nested-test-parser.json diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java index fb440311a6e0..868eec46fd05 100644 --- a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java +++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java @@ -19,10 +19,9 @@ package org.apache.druid.cli; -import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.name.Names; @@ -30,6 +29,8 @@ import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.NestedDataModule; import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.guice.annotations.Json; @@ -40,15 +41,19 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; -import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.DefaultColumnFormatConfig; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; @@ -56,7 +61,6 @@ import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex; import org.apache.druid.testing.InitializedNullHandlingTest; -import org.apache.druid.timeline.SegmentId; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -66,16 +70,13 @@ import org.mockito.Mockito; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Collections; import java.util.List; public class DumpSegmentTest extends InitializedNullHandlingTest { - private final AggregationTestHelper helper; private final Closer closer; @Rule @@ -84,11 +85,6 @@ public class DumpSegmentTest extends InitializedNullHandlingTest public DumpSegmentTest() { NestedDataModule.registerHandlersAndSerde(); - List mods = NestedDataModule.getJacksonModulesList(); - this.helper = AggregationTestHelper.createScanQueryAggregationTestHelper( - mods, - tempFolder - ); this.closer = Closer.create(); } @@ -159,9 +155,16 @@ public void testDumpNestedColumn() throws Exception Injector injector = Mockito.mock(Injector.class); ObjectMapper mapper = TestHelper.makeJsonMapper(); mapper.registerModules(NestedDataModule.getJacksonModulesList()); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(DefaultColumnFormatConfig.class, new DefaultColumnFormatConfig(null)) + ); Mockito.when(injector.getInstance(Key.get(ObjectMapper.class, Json.class))).thenReturn(mapper); + Mockito.when(injector.getInstance(DefaultColumnFormatConfig.class)).thenReturn(new DefaultColumnFormatConfig(null)); - List segments = createSegments(helper, tempFolder, closer); + List segments = createSegments(tempFolder, closer); QueryableIndex queryableIndex = segments.get(0).asQueryableIndex(); File outputFile = tempFolder.newFile(); @@ -192,8 +195,16 @@ public void testDumpNestedColumnPath() throws Exception Injector injector = Mockito.mock(Injector.class); ObjectMapper mapper = TestHelper.makeJsonMapper(); mapper.registerModules(NestedDataModule.getJacksonModulesList()); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(DefaultColumnFormatConfig.class, new DefaultColumnFormatConfig(null)) + ); Mockito.when(injector.getInstance(Key.get(ObjectMapper.class, Json.class))).thenReturn(mapper); - List segments = createSegments(helper, tempFolder, closer); + Mockito.when(injector.getInstance(DefaultColumnFormatConfig.class)).thenReturn(new DefaultColumnFormatConfig(null)); + + List segments = createSegments(tempFolder, closer); QueryableIndex queryableIndex = segments.get(0).asQueryableIndex(); File outputFile = tempFolder.newFile(); @@ -236,49 +247,24 @@ public void testGetModules() public static List createSegments( - AggregationTestHelper helper, TemporaryFolder tempFolder, Closer closer ) throws Exception { - File segmentDir = tempFolder.newFolder(); - File inputFile = readFileFromClasspath("nested-test-data.json"); - FileInputStream inputDataStream = new FileInputStream(inputFile); - String parserJson = readFileFromClasspathAsString("nested-test-parser.json"); - String aggJson = readFileFromClasspathAsString("nested-test-aggs.json"); - - helper.createIndex( - inputDataStream, - parserJson, - aggJson, - segmentDir, - 0, + return NestedDataTestUtils.createSegments( + tempFolder, + closer, + "nested-test-data.json", + NestedDataTestUtils.DEFAULT_JSON_INPUT_FORMAT, + new TimestampSpec("timestamp", null, null), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + null, + new AggregatorFactory[] { + new CountAggregatorFactory("count") + }, Granularities.HOUR, - 1000, - true - ); - - final List segments = Lists.transform( - ImmutableList.of(segmentDir), - dir -> { - try { - return closer.register(new QueryableIndexSegment(helper.getIndexIO().loadIndex(dir), SegmentId.dummy(""))); - } - catch (IOException ex) { - throw new RuntimeException(ex); - } - } + true, + IndexSpec.DEFAULT ); - - return segments; - } - public static File readFileFromClasspath(String fileName) - { - return new File(NestedDataTestUtils.class.getClassLoader().getResource(fileName).getFile()); - } - - public static String readFileFromClasspathAsString(String fileName) throws IOException - { - return com.google.common.io.Files.asCharSource(readFileFromClasspath(fileName), StandardCharsets.UTF_8).read(); } } diff --git a/services/src/test/resources/nested-test-aggs.json b/services/src/test/resources/nested-test-aggs.json deleted file mode 100644 index 6a330e6ccd90..000000000000 --- a/services/src/test/resources/nested-test-aggs.json +++ /dev/null @@ -1,6 +0,0 @@ -[ - { - "type": "count", - "name": "count" - } -] diff --git a/services/src/test/resources/nested-test-parser.json b/services/src/test/resources/nested-test-parser.json deleted file mode 100644 index 9ba5cd0c57bd..000000000000 --- a/services/src/test/resources/nested-test-parser.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "timestamp", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - { - "type": "json", - "name": "nest" - } - ], - "dimensionExclusions": [], - "spatialDimensions": [] - } - } -} From 21dfb6497970daf2bda9ddbbec2ccf51b7dce4b3 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 11 Sep 2023 23:01:43 -0700 Subject: [PATCH 8/8] javadoc and validation --- .../druid/segment/DefaultColumnFormatConfig.java | 13 +++++++++++++ .../druid/segment/NestedDataColumnSchema.java | 8 ++++++++ .../druid/segment/NestedDataColumnSchemaTest.java | 15 +++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java index ea51ec19ba84..3f41fb88c032 100644 --- a/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java +++ b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java @@ -21,12 +21,24 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.DruidException; import javax.annotation.Nullable; import java.util.Objects; public class DefaultColumnFormatConfig { + public static void validateNestedFormatVersion(@Nullable Integer formatVersion) + { + if (formatVersion != null) { + if (formatVersion < 4 || formatVersion > 5) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Unsupported nested column format version[%s]", formatVersion); + } + } + } + @JsonProperty("nestedColumnFormatVersion") private final Integer nestedColumnFormatVersion; @@ -36,6 +48,7 @@ public DefaultColumnFormatConfig( ) { this.nestedColumnFormatVersion = nestedColumnFormatVersion; + validateNestedFormatVersion(this.nestedColumnFormatVersion); } @Nullable diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java index 7b3e79c1dd5b..acde3af49bfc 100644 --- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java @@ -29,6 +29,12 @@ import javax.annotation.Nullable; import java.util.Objects; +/** + * Nested column {@link DimensionSchema}. If {@link #formatVersion} is set to 4, or null and + * {@link DefaultColumnFormatConfig#nestedColumnFormatVersion} is set to 4, then {@link NestedDataColumnHandlerV4} is + * used, else {@link NestedCommonFormatColumnHandler} is used instead and this is equivalent to using + * {@link AutoTypeColumnSchema} + */ public class NestedDataColumnSchema extends DimensionSchema { final int formatVersion; @@ -50,6 +56,7 @@ public NestedDataColumnSchema( // but as far as this is concerned it is v5 formatVersion = 5; } + DefaultColumnFormatConfig.validateNestedFormatVersion(this.formatVersion); } public NestedDataColumnSchema( @@ -59,6 +66,7 @@ public NestedDataColumnSchema( { super(name, null, true); this.formatVersion = version; + DefaultColumnFormatConfig.validateNestedFormatVersion(this.formatVersion); } @JsonProperty("formatVersion") diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java index 476207a29461..8bfcfdefa4f4 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; import org.apache.druid.jackson.DefaultObjectMapper; import org.junit.Assert; import org.junit.Test; @@ -83,4 +84,18 @@ public void testSerdeOverride() throws JsonProcessingException NestedDataColumnSchema andBack = MAPPER.readValue(there, NestedDataColumnSchema.class); Assert.assertEquals(new NestedDataColumnSchema("test", 4), andBack); } + + @Test + public void testVersionTooSmall() + { + Throwable t = Assert.assertThrows(DruidException.class, () -> new NestedDataColumnSchema("test", 3)); + Assert.assertEquals("Unsupported nested column format version[3]", t.getMessage()); + } + + @Test + public void testVersionTooBig() + { + Throwable t = Assert.assertThrows(DruidException.class, () -> new NestedDataColumnSchema("test", 6)); + Assert.assertEquals("Unsupported nested column format version[6]", t.getMessage()); + } }