diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java index 4fd1090d172d..b8693b86dcd3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java @@ -71,7 +71,7 @@ public void testSerde() throws IOException data ) ); - String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"json\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; + String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"auto\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; Assert.assertEquals(expected, out); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java index 379c8c21ac9f..9b692ecb7c55 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java @@ -32,6 +32,7 @@ import org.apache.druid.segment.AutoTypeColumnSchema; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.NestedDataColumnSchema; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.TypeSignature; import org.apache.druid.segment.column.ValueType; @@ -50,7 +51,7 @@ @JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class), - @JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = AutoTypeColumnSchema.class), + @JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataColumnSchema.class), @JsonSubTypes.Type(name = AutoTypeColumnSchema.TYPE, value = AutoTypeColumnSchema.class) }) public abstract class DimensionSchema diff --git a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java index 32564f4dd408..247d83af81ed 100644 --- a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java +++ b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java @@ -24,9 +24,14 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.annotations.VisibleForTesting; import com.google.inject.Binder; +import com.google.inject.Provides; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.segment.DefaultColumnFormatConfig; +import org.apache.druid.segment.DimensionHandler; +import org.apache.druid.segment.DimensionHandlerProvider; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.NestedCommonFormatColumnHandler; +import org.apache.druid.segment.NestedDataColumnHandlerV4; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.nested.StructuredDataJsonSerializer; @@ -47,20 +52,28 @@ public List getJacksonModules() @Override public void configure(Binder binder) { - registerHandlersAndSerde(); + registerSerde(); + // binding our side effect class to the lifecycle causes registerHandler to be called on service start, allowing + // use of the config to get the system default format version + LifecycleModule.register(binder, SideEffectHandlerRegisterer.class); } - - @VisibleForTesting - public static void registerHandlersAndSerde() + @Provides + @LazySingleton + public SideEffectHandlerRegisterer registerHandler(DefaultColumnFormatConfig formatsConfig) { - if (ComplexMetrics.getSerdeForType(NestedDataComplexTypeSerde.TYPE_NAME) == null) { - ComplexMetrics.registerSerde(NestedDataComplexTypeSerde.TYPE_NAME, NestedDataComplexTypeSerde.INSTANCE); + if (formatsConfig.getNestedColumnFormatVersion() != null && formatsConfig.getNestedColumnFormatVersion() == 4) { + DimensionHandlerUtils.registerDimensionHandlerProvider( + NestedDataComplexTypeSerde.TYPE_NAME, + new NestedColumnV4HandlerProvider() + ); + } else { + DimensionHandlerUtils.registerDimensionHandlerProvider( + NestedDataComplexTypeSerde.TYPE_NAME, + new NestedCommonFormatHandlerProvider() + ); } - DimensionHandlerUtils.registerDimensionHandlerProvider( - NestedDataComplexTypeSerde.TYPE_NAME, - NestedCommonFormatColumnHandler::new - ); + return new SideEffectHandlerRegisterer(); } public static List getJacksonModulesList() @@ -71,4 +84,56 @@ public static List getJacksonModulesList() .addSerializer(StructuredData.class, new StructuredDataJsonSerializer()) ); } + + /** + * Helper for wiring stuff up for tests + */ + @VisibleForTesting + public static void registerHandlersAndSerde() + { + registerSerde(); + DimensionHandlerUtils.registerDimensionHandlerProvider( + NestedDataComplexTypeSerde.TYPE_NAME, + new NestedCommonFormatHandlerProvider() + ); + } + + private static void registerSerde() + { + if (ComplexMetrics.getSerdeForType(NestedDataComplexTypeSerde.TYPE_NAME) == null) { + ComplexMetrics.registerSerde(NestedDataComplexTypeSerde.TYPE_NAME, NestedDataComplexTypeSerde.INSTANCE); + } + } + + public static class NestedCommonFormatHandlerProvider + implements DimensionHandlerProvider + { + + @Override + public DimensionHandler get(String dimensionName) + { + return new NestedCommonFormatColumnHandler(dimensionName); + } + } + + public static class NestedColumnV4HandlerProvider + implements DimensionHandlerProvider + { + + @Override + public DimensionHandler get(String dimensionName) + { + return new NestedDataColumnHandlerV4(dimensionName); + } + } + /** + * this is used as a vehicle to register the correct version of the system default nested column handler by side + * effect with the help of binding to {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} so that + * {@link #registerHandler(DefaultColumnFormatConfig)} can be called with the injected + * {@link DefaultColumnFormatConfig}. + */ + public static class SideEffectHandlerRegisterer + { + // nothing to see here + } } diff --git a/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java new file mode 100644 index 000000000000..3f41fb88c032 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/DefaultColumnFormatConfig.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.DruidException; + +import javax.annotation.Nullable; +import java.util.Objects; + +public class DefaultColumnFormatConfig +{ + public static void validateNestedFormatVersion(@Nullable Integer formatVersion) + { + if (formatVersion != null) { + if (formatVersion < 4 || formatVersion > 5) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("Unsupported nested column format version[%s]", formatVersion); + } + } + } + + @JsonProperty("nestedColumnFormatVersion") + private final Integer nestedColumnFormatVersion; + + @JsonCreator + public DefaultColumnFormatConfig( + @JsonProperty("nestedColumnFormatVersion") @Nullable Integer nestedColumnFormatVersion + ) + { + this.nestedColumnFormatVersion = nestedColumnFormatVersion; + validateNestedFormatVersion(this.nestedColumnFormatVersion); + } + + @Nullable + @JsonProperty("nestedColumnFormatVersion") + public Integer getNestedColumnFormatVersion() + { + return nestedColumnFormatVersion; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultColumnFormatConfig that = (DefaultColumnFormatConfig) o; + return Objects.equals(nestedColumnFormatVersion, that.nestedColumnFormatVersion); + } + + @Override + public int hashCode() + { + return Objects.hash(nestedColumnFormatVersion); + } + + @Override + public String toString() + { + return "DefaultColumnFormatConfig{" + + "nestedColumnFormatVersion=" + nestedColumnFormatVersion + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java new file mode 100644 index 000000000000..a63a18eac7d3 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnHandlerV4.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.nested.StructuredData; +import org.apache.druid.segment.selector.settable.SettableColumnValueSelector; +import org.apache.druid.segment.selector.settable.SettableObjectColumnValueSelector; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.util.Comparator; + +public class NestedDataColumnHandlerV4 implements DimensionHandler +{ + private static Comparator COMPARATOR = (s1, s2) -> + StructuredData.COMPARATOR.compare( + StructuredData.wrap(s1.getObject()), + StructuredData.wrap(s2.getObject()) + ); + + private final String name; + + public NestedDataColumnHandlerV4(String name) + { + this.name = name; + } + + @Override + public String getDimensionName() + { + return name; + } + + @Override + public DimensionSpec getDimensionSpec() + { + return new DefaultDimensionSpec(name, name, ColumnType.NESTED_DATA); + } + + @Override + public DimensionSchema getDimensionSchema(ColumnCapabilities capabilities) + { + return new NestedDataColumnSchema(name, 4); + } + + @Override + public DimensionIndexer makeIndexer(boolean useMaxMemoryEstimates) + { + return new NestedDataColumnIndexerV4(); + } + + @Override + public DimensionMergerV9 makeMerger( + IndexSpec indexSpec, + SegmentWriteOutMedium segmentWriteOutMedium, + ColumnCapabilities capabilities, + ProgressIndicator progress, + Closer closer + ) + { + return new NestedDataColumnMergerV4(name, indexSpec, segmentWriteOutMedium, closer); + } + + @Override + public int getLengthOfEncodedKeyComponent(StructuredData dimVals) + { + // this is called in one place, OnheapIncrementalIndex, where returning 0 here means the value is null + // so the actual value we return here doesn't matter. we should consider refactoring this to a boolean + return 1; + } + + @Override + public Comparator getEncodedValueSelectorComparator() + { + return COMPARATOR; + } + + @Override + public SettableColumnValueSelector makeNewSettableEncodedValueSelector() + { + return new SettableObjectColumnValueSelector(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java new file mode 100644 index 000000000000..ecaf89e84836 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexerV4.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.collections.bitmap.MutableBitmap; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.math.expr.ExpressionType; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.column.ColumnFormat; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.data.CloseableIndexed; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexRowHolder; +import org.apache.druid.segment.nested.FieldTypeInfo; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; +import org.apache.druid.segment.nested.NestedPathFinder; +import org.apache.druid.segment.nested.NestedPathPart; +import org.apache.druid.segment.nested.SortedValueDictionary; +import org.apache.druid.segment.nested.StructuredData; +import org.apache.druid.segment.nested.StructuredDataProcessor; +import org.apache.druid.segment.nested.ValueDictionary; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; + +public class NestedDataColumnIndexerV4 implements DimensionIndexer +{ + private static final ColumnFormat FORMAT = new NestedDataComplexTypeSerde.NestedColumnFormatV4(); + + protected volatile boolean hasNulls = false; + + protected SortedMap fieldIndexers = new TreeMap<>(); + protected final ValueDictionary globalDictionary = new ValueDictionary(); + + int estimatedFieldKeySize = 0; + + protected final StructuredDataProcessor indexerProcessor = new StructuredDataProcessor() + { + @Override + public ProcessedValue processField(ArrayList fieldPath, @Nullable Object fieldValue) + { + // null value is always added to the global dictionary as id 0, so we can ignore them here + if (fieldValue != null) { + final String fieldName = NestedPathFinder.toNormalizedJsonPath(fieldPath); + ExprEval eval = ExprEval.bestEffortOf(fieldValue); + FieldIndexer fieldIndexer = fieldIndexers.get(fieldName); + if (fieldIndexer == null) { + estimatedFieldKeySize += StructuredDataProcessor.estimateStringSize(fieldName); + fieldIndexer = new FieldIndexer(globalDictionary); + fieldIndexers.put(fieldName, fieldIndexer); + } + return fieldIndexer.processValue(eval); + } + return ProcessedValue.NULL_LITERAL; + } + + @Nullable + @Override + public ProcessedValue processArrayField( + ArrayList fieldPath, + @Nullable List array + ) + { + // classic nested data column indexer does not handle arrays + return null; + } + }; + + @Override + public EncodedKeyComponent processRowValsToUnsortedEncodedKeyComponent( + @Nullable Object dimValues, + boolean reportParseExceptions + ) + { + final long oldDictSizeInBytes = globalDictionary.sizeInBytes(); + final int oldFieldKeySize = estimatedFieldKeySize; + final StructuredData data; + if (dimValues == null) { + hasNulls = true; + data = null; + } else if (dimValues instanceof StructuredData) { + data = (StructuredData) dimValues; + } else { + data = new StructuredData(dimValues); + } + StructuredDataProcessor.ProcessResults info = indexerProcessor.processFields(data == null ? null : data.getValue()); + // 'raw' data is currently preserved 'as-is', and not replaced with object references to the global dictionaries + long effectiveSizeBytes = info.getEstimatedSize(); + // then, we add the delta of size change to the global dictionaries to account for any new space added by the + // 'raw' data + effectiveSizeBytes += (globalDictionary.sizeInBytes() - oldDictSizeInBytes); + effectiveSizeBytes += (estimatedFieldKeySize - oldFieldKeySize); + return new EncodedKeyComponent<>(data, effectiveSizeBytes); + } + + @Override + public void setSparseIndexed() + { + this.hasNulls = true; + } + + @Override + public StructuredData getUnsortedEncodedValueFromSorted(StructuredData sortedIntermediateValue) + { + return sortedIntermediateValue; + } + + @Override + public CloseableIndexed getSortedIndexedValues() + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public StructuredData getMinValue() + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public StructuredData getMaxValue() + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public int getCardinality() + { + return globalDictionary.getCardinality(); + } + + @Override + public DimensionSelector makeDimensionSelector( + DimensionSpec spec, + IncrementalIndexRowHolder currEntry, + IncrementalIndex.DimensionDesc desc + ) + { + final int dimIndex = desc.getIndex(); + final ColumnValueSelector rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex); + if (rootLiteralSelector != null) { + return new BaseSingleValueDimensionSelector() + { + @Nullable + @Override + protected String getValue() + { + final Object o = rootLiteralSelector.getObject(); + if (o == null) { + return null; + } + return o.toString(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } + // column has nested data or is of mixed root type, cannot use + throw new UOE( + "makeDimensionSelector is not supported, column [%s] is [%s] typed and should only use makeColumnValueSelector", + spec.getOutputName(), + ColumnType.NESTED_DATA + ); + } + + @Override + public ColumnValueSelector makeColumnValueSelector( + IncrementalIndexRowHolder currEntry, + IncrementalIndex.DimensionDesc desc + ) + { + final int dimIndex = desc.getIndex(); + final ColumnValueSelector rootLiteralSelector = getRootLiteralValueSelector(currEntry, dimIndex); + if (rootLiteralSelector != null) { + return rootLiteralSelector; + } + + return new ObjectColumnSelector() + { + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Nullable + @Override + public StructuredData getObject() + { + final Object[] dims = currEntry.get().getDims(); + if (0 <= dimIndex && dimIndex < dims.length) { + return (StructuredData) dims[dimIndex]; + } else { + return null; + } + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + }; + } + private ColumnType getLogicalType() + { + if (fieldIndexers.size() == 1 && fieldIndexers.containsKey(NestedPathFinder.JSON_PATH_ROOT)) { + FieldIndexer rootField = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + ColumnType singleType = rootField.getTypes().getSingleType(); + return singleType == null ? ColumnType.NESTED_DATA : singleType; + } + return ColumnType.NESTED_DATA; + } + + @Override + public ColumnCapabilities getColumnCapabilities() + { + return ColumnCapabilitiesImpl.createDefault() + .setType(getLogicalType()) + .setHasNulls(hasNulls); + } + + @Override + public ColumnFormat getFormat() + { + return FORMAT; + } + + public SortedValueDictionary getSortedValueLookups() + { + return globalDictionary.getSortedCollector(); + } + + public SortedMap getFieldTypeInfo() + { + TreeMap fields = new TreeMap<>(); + for (Map.Entry entry : fieldIndexers.entrySet()) { + // skip adding the field if no types are in the set, meaning only null values have been processed + if (!entry.getValue().getTypes().isEmpty()) { + fields.put(entry.getKey(), entry.getValue().getTypes()); + } + } + return fields; + } + + @Override + public int compareUnsortedEncodedKeyComponents( + @Nullable StructuredData lhs, + @Nullable StructuredData rhs + ) + { + return StructuredData.COMPARATOR.compare(lhs, rhs); + } + + @Override + public boolean checkUnsortedEncodedKeyComponentsEqual( + @Nullable StructuredData lhs, + @Nullable StructuredData rhs + ) + { + return Objects.equals(lhs, rhs); + } + + @Override + public int getUnsortedEncodedKeyComponentHashCode(@Nullable StructuredData key) + { + return Objects.hash(key); + } + + @Override + public Object convertUnsortedEncodedKeyComponentToActualList(StructuredData key) + { + return key; + } + + @Override + public ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues) + { + final FieldIndexer rootIndexer = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (fieldIndexers.size() == 1 && rootIndexer != null && rootIndexer.isSingleType()) { + // for root only literals, makeColumnValueSelector and makeDimensionSelector automatically unwrap StructuredData + // we need to do the opposite here, wrapping selector values with a StructuredData so that they are consistently + // typed for the merger + return new ColumnValueSelector() + { + @Override + public boolean isNull() + { + return selectorWithUnsortedValues.isNull(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + selectorWithUnsortedValues.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public StructuredData getObject() + { + return StructuredData.wrap(selectorWithUnsortedValues.getObject()); + } + + @Override + public float getFloat() + { + return selectorWithUnsortedValues.getFloat(); + } + + @Override + public double getDouble() + { + return selectorWithUnsortedValues.getDouble(); + } + + @Override + public long getLong() + { + return selectorWithUnsortedValues.getLong(); + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + }; + } + return selectorWithUnsortedValues; + } + + @Override + public void fillBitmapsFromUnsortedEncodedKeyComponent( + StructuredData key, + int rowNum, + MutableBitmap[] bitmapIndexes, + BitmapFactory factory + ) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Nullable + private ColumnValueSelector getRootLiteralValueSelector( + IncrementalIndexRowHolder currEntry, + int dimIndex + ) + { + if (fieldIndexers.size() > 1) { + return null; + } + final FieldIndexer root = fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT); + if (root == null || !root.isSingleType()) { + return null; + } + return new ColumnValueSelector() + { + @Override + public boolean isNull() + { + final Object o = getObject(); + return !(o instanceof Number); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).floatValue(); + } + + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).doubleValue(); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return ((Number) value).longValue(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Nullable + @Override + public Object getObject() + { + final Object[] dims = currEntry.get().getDims(); + if (0 <= dimIndex && dimIndex < dims.length) { + final StructuredData data = (StructuredData) dims[dimIndex]; + if (data != null) { + return ExprEval.bestEffortOf(data.getValue()).valueOrDefault(); + } + } + + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + static class FieldIndexer + { + private final ValueDictionary valueDictionary; + private final FieldTypeInfo.MutableTypeSet typeSet; + + FieldIndexer(ValueDictionary valueDictionary) + { + this.valueDictionary = valueDictionary; + this.typeSet = new FieldTypeInfo.MutableTypeSet(); + } + + private StructuredDataProcessor.ProcessedValue processValue(ExprEval eval) + { + final ColumnType columnType = ExpressionType.toColumnType(eval.type()); + int sizeEstimate; + switch (columnType.getType()) { + case LONG: + typeSet.add(ColumnType.LONG); + sizeEstimate = valueDictionary.addLongValue(eval.asLong()); + return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(), sizeEstimate); + case DOUBLE: + typeSet.add(ColumnType.DOUBLE); + sizeEstimate = valueDictionary.addDoubleValue(eval.asDouble()); + return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(), sizeEstimate); + case STRING: + typeSet.add(ColumnType.STRING); + final String asString = eval.asString(); + sizeEstimate = valueDictionary.addStringValue(asString); + return new StructuredDataProcessor.ProcessedValue<>(asString, sizeEstimate); + default: + throw new IAE("Unhandled type: %s", columnType); + } + } + + public FieldTypeInfo.MutableTypeSet getTypes() + { + return typeSet; + } + + public boolean isSingleType() + { + return typeSet.getSingleType() != null; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java new file mode 100644 index 000000000000..9a0dc139fbf7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMergerV4.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.PeekingIterator; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.column.ColumnDescriptor; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.nested.FieldTypeInfo; +import org.apache.druid.segment.nested.NestedDataColumnSerializerV4; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; +import org.apache.druid.segment.nested.SortedValueDictionary; +import org.apache.druid.segment.serde.ComplexColumnPartSerde; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.IntBuffer; +import java.util.Comparator; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +public class NestedDataColumnMergerV4 implements DimensionMergerV9 +{ + private static final Logger log = new Logger(NestedDataColumnMergerV4.class); + + public static final Comparator> STRING_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + public static final Comparator> LONG_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + public static final Comparator> DOUBLE_MERGING_COMPARATOR = + SimpleDictionaryMergingIterator.makePeekingComparator(); + + private final String name; + private final IndexSpec indexSpec; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final Closer closer; + + private ColumnDescriptor.Builder descriptorBuilder; + private GenericColumnSerializer serializer; + + public NestedDataColumnMergerV4( + String name, + IndexSpec indexSpec, + SegmentWriteOutMedium segmentWriteOutMedium, + Closer closer + ) + { + + this.name = name; + this.indexSpec = indexSpec; + this.segmentWriteOutMedium = segmentWriteOutMedium; + this.closer = closer; + } + + @Override + public void writeMergedValueDictionary(List adapters) throws IOException + { + try { + long dimStartTime = System.currentTimeMillis(); + + int numMergeIndex = 0; + SortedValueDictionary sortedLookup = null; + final Indexed[] sortedLookups = new Indexed[adapters.size()]; + final Indexed[] sortedLongLookups = new Indexed[adapters.size()]; + final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()]; + + final SortedMap mergedFields = new TreeMap<>(); + + for (int i = 0; i < adapters.size(); i++) { + final IndexableAdapter adapter = adapters.get(i); + + final IndexableAdapter.NestedColumnMergable mergable = closer.register( + adapter.getNestedColumnMergeables(name) + ); + if (mergable == null) { + continue; + } + final SortedValueDictionary dimValues = mergable.getValueDictionary(); + + boolean allNulls = dimValues == null || dimValues.allNull(); + if (!allNulls) { + sortedLookup = dimValues; + mergable.mergeFieldsInto(mergedFields); + sortedLookups[i] = dimValues.getSortedStrings(); + sortedLongLookups[i] = dimValues.getSortedLongs(); + sortedDoubleLookups[i] = dimValues.getSortedDoubles(); + numMergeIndex++; + } + } + + descriptorBuilder = new ColumnDescriptor.Builder(); + + final NestedDataColumnSerializerV4 defaultSerializer = new NestedDataColumnSerializerV4( + name, + indexSpec, + segmentWriteOutMedium, + closer + ); + serializer = defaultSerializer; + + final ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.serializerBuilder() + .withTypeName(NestedDataComplexTypeSerde.TYPE_NAME) + .withDelegate(serializer) + .build(); + descriptorBuilder.setValueType(ValueType.COMPLEX) + .setHasMultipleValues(false) + .addSerde(partSerde); + + defaultSerializer.open(); + defaultSerializer.serializeFields(mergedFields); + + int stringCardinality; + int longCardinality; + int doubleCardinality; + if (numMergeIndex == 1) { + defaultSerializer.serializeDictionaries( + sortedLookup.getSortedStrings(), + sortedLookup.getSortedLongs(), + sortedLookup.getSortedDoubles() + ); + stringCardinality = sortedLookup.getStringCardinality(); + longCardinality = sortedLookup.getLongCardinality(); + doubleCardinality = sortedLookup.getDoubleCardinality(); + } else { + final SimpleDictionaryMergingIterator stringIterator = new SimpleDictionaryMergingIterator<>( + sortedLookups, + STRING_MERGING_COMPARATOR + ); + final SimpleDictionaryMergingIterator longIterator = new SimpleDictionaryMergingIterator<>( + sortedLongLookups, + LONG_MERGING_COMPARATOR + ); + final SimpleDictionaryMergingIterator doubleIterator = new SimpleDictionaryMergingIterator<>( + sortedDoubleLookups, + DOUBLE_MERGING_COMPARATOR + ); + defaultSerializer.serializeDictionaries( + () -> stringIterator, + () -> longIterator, + () -> doubleIterator + ); + stringCardinality = stringIterator.getCardinality(); + longCardinality = longIterator.getCardinality(); + doubleCardinality = doubleIterator.getCardinality(); + } + + log.debug( + "Completed dim[%s] conversions with string cardinality[%,d], long cardinality[%,d], double cardinality[%,d] in %,d millis.", + name, + stringCardinality, + longCardinality, + doubleCardinality, + System.currentTimeMillis() - dimStartTime + ); + } + catch (IOException ioe) { + log.error(ioe, "Failed to merge dictionary for column [%s]", name); + throw ioe; + } + } + + @Override + public ColumnValueSelector convertSortedSegmentRowValuesToMergedRowValues( + int segmentIndex, + ColumnValueSelector source + ) + { + return source; + } + + @Override + public void processMergedRow(ColumnValueSelector selector) throws IOException + { + serializer.serialize(selector); + } + + @Override + public void writeIndexes(@Nullable List segmentRowNumConversions) + { + // fields write their own indexes + } + + @Override + public boolean hasOnlyNulls() + { + return false; + } + + @Override + public ColumnDescriptor makeColumnDescriptor() + { + return descriptorBuilder.build(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java new file mode 100644 index 000000000000..acde3af49bfc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnSchema.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Nested column {@link DimensionSchema}. If {@link #formatVersion} is set to 4, or null and + * {@link DefaultColumnFormatConfig#nestedColumnFormatVersion} is set to 4, then {@link NestedDataColumnHandlerV4} is + * used, else {@link NestedCommonFormatColumnHandler} is used instead and this is equivalent to using + * {@link AutoTypeColumnSchema} + */ +public class NestedDataColumnSchema extends DimensionSchema +{ + final int formatVersion; + + @JsonCreator + public NestedDataColumnSchema( + @JsonProperty("name") String name, + @JsonProperty("formatVersion") @Nullable Integer version, + @JacksonInject DefaultColumnFormatConfig defaultFormatConfig + ) + { + super(name, null, true); + if (version != null) { + formatVersion = version; + } else if (defaultFormatConfig.getNestedColumnFormatVersion() != null) { + formatVersion = defaultFormatConfig.getNestedColumnFormatVersion(); + } else { + // this is sort of a lie... it's not really v5 in the segment, rather its v0 of the 'nested common format' + // but as far as this is concerned it is v5 + formatVersion = 5; + } + DefaultColumnFormatConfig.validateNestedFormatVersion(this.formatVersion); + } + + public NestedDataColumnSchema( + String name, + int version + ) + { + super(name, null, true); + this.formatVersion = version; + DefaultColumnFormatConfig.validateNestedFormatVersion(this.formatVersion); + } + + @JsonProperty("formatVersion") + public int getFormatVersion() + { + return formatVersion; + } + + @Override + public String getTypeName() + { + return NestedDataComplexTypeSerde.TYPE_NAME; + } + + @Override + public ColumnType getColumnType() + { + return ColumnType.NESTED_DATA; + } + + @Override + public DimensionHandler getDimensionHandler() + { + if (formatVersion == 4) { + return new NestedDataColumnHandlerV4(getName()); + } else { + return new NestedCommonFormatColumnHandler(getName()); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + NestedDataColumnSchema that = (NestedDataColumnSchema) o; + return Objects.equals(formatVersion, that.formatVersion); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), formatVersion); + } + + @Override + public String toString() + { + return "NestedDataColumnSchema{" + + "formatVersion=" + formatVersion + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java index aaaedac580fb..a61ff334bb9e 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java @@ -27,6 +27,7 @@ import org.apache.druid.segment.IndexableAdapter; import org.apache.druid.segment.IntIteratorUtils; import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.NestedDataColumnIndexerV4; import org.apache.druid.segment.TransformableRowIterator; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnFormat; @@ -150,6 +151,18 @@ public NestedColumnMergable getNestedColumnMergeables(String column) } final DimensionIndexer indexer = accessor.dimensionDesc.getIndexer(); + if (indexer instanceof NestedDataColumnIndexerV4) { + NestedDataColumnIndexerV4 nestedDataColumnIndexer = (NestedDataColumnIndexerV4) indexer; + + return new NestedColumnMergable( + nestedDataColumnIndexer.getSortedValueLookups(), + nestedDataColumnIndexer.getFieldTypeInfo(), + true, + false, + null + ); + } + if (indexer instanceof AutoTypeColumnIndexer) { AutoTypeColumnIndexer autoIndexer = (AutoTypeColumnIndexer) indexer; return new NestedColumnMergable( diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 9ff678ee10d6..565ec3294fba 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -34,11 +34,13 @@ import org.apache.druid.segment.DimensionDictionarySelector; import org.apache.druid.segment.DimensionIndexer; import org.apache.druid.segment.Metadata; +import org.apache.druid.segment.NestedDataColumnIndexerV4; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.Indexed; import org.apache.druid.segment.data.ListIndexed; import org.apache.druid.segment.filter.BooleanValueMatcher; @@ -207,6 +209,13 @@ public Comparable getMaxValue(String column) @Override public ColumnCapabilities getColumnCapabilities(String column) { + IncrementalIndex.DimensionDesc desc = index.getDimension(column); + // nested column indexer is a liar, and behaves like any type if it only processes unnested literals of a single + // type, so force it to use nested column type + if (desc != null && desc.getIndexer() instanceof NestedDataColumnIndexerV4) { + return ColumnCapabilitiesImpl.createDefault().setType(ColumnType.NESTED_DATA); + } + // Different from index.getColumnCapabilities because, in a way, IncrementalIndex's string-typed dimensions // are always potentially multi-valued at query time. (Missing / null values for a row can potentially be // represented by an empty array; see StringDimensionIndexer.IndexerDimensionSelector's getRow method.) diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java new file mode 100644 index 000000000000..f10efd2d2413 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializerV4.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.nested; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.druid.collections.bitmap.ImmutableBitmap; +import org.apache.druid.collections.bitmap.MutableBitmap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprEval; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.StringEncodingStrategies; +import org.apache.druid.segment.column.Types; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.ByteBufferWriter; +import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer; +import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.data.DictionaryWriter; +import org.apache.druid.segment.data.FixedIndexedWriter; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.GenericIndexedWriter; +import org.apache.druid.segment.serde.Serializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +public class NestedDataColumnSerializerV4 implements GenericColumnSerializer +{ + private static final Logger log = new Logger(NestedDataColumnSerializerV4.class); + public static final String STRING_DICTIONARY_FILE_NAME = "__stringDictionary"; + public static final String LONG_DICTIONARY_FILE_NAME = "__longDictionary"; + public static final String DOUBLE_DICTIONARY_FILE_NAME = "__doubleDictionary"; + public static final String RAW_FILE_NAME = "__raw"; + public static final String NULL_BITMAP_FILE_NAME = "__nullIndex"; + public static final String NESTED_FIELD_PREFIX = "__field_"; + + private final String name; + private final SegmentWriteOutMedium segmentWriteOutMedium; + private final IndexSpec indexSpec; + @SuppressWarnings("unused") + private final Closer closer; + + private final StructuredDataProcessor fieldProcessor = new StructuredDataProcessor() + { + @Override + public ProcessedValue processField(ArrayList fieldPath, @Nullable Object fieldValue) + { + final GlobalDictionaryEncodedFieldColumnWriter writer = fieldWriters.get( + NestedPathFinder.toNormalizedJsonPath(fieldPath) + ); + if (writer != null) { + try { + final ExprEval eval = ExprEval.bestEffortOf(fieldValue); + if (eval.type().isPrimitive() || eval.type().isPrimitiveArray()) { + writer.addValue(rowCount, eval.value()); + } else { + // behave consistently with nested column indexer, which defaults to string + writer.addValue(rowCount, eval.asString()); + } + // serializer doesn't use size estimate + return ProcessedValue.NULL_LITERAL; + } + catch (IOException e) { + throw new RE(e, "Failed to write field [%s], unhandled value", fieldPath); + } + } + return ProcessedValue.NULL_LITERAL; + } + + @Nullable + @Override + public ProcessedValue processArrayField( + ArrayList fieldPath, + @Nullable List array + ) + { + // classic nested column ingestion does not support array fields + return null; + } + }; + + private byte[] metadataBytes; + private DictionaryIdLookup globalDictionaryIdLookup; + private SortedMap fields; + private GenericIndexedWriter fieldsWriter; + private FieldTypeInfo.Writer fieldsInfoWriter; + private DictionaryWriter dictionaryWriter; + private FixedIndexedWriter longDictionaryWriter; + private FixedIndexedWriter doubleDictionaryWriter; + private CompressedVariableSizedBlobColumnSerializer rawWriter; + private ByteBufferWriter nullBitmapWriter; + private MutableBitmap nullRowsBitmap; + private Map> fieldWriters; + private int rowCount = 0; + private boolean closedForWrite = false; + + private boolean dictionarySerialized = false; + + public NestedDataColumnSerializerV4( + String name, + IndexSpec indexSpec, + SegmentWriteOutMedium segmentWriteOutMedium, + Closer closer + ) + { + this.name = name; + this.segmentWriteOutMedium = segmentWriteOutMedium; + this.indexSpec = indexSpec; + this.closer = closer; + this.globalDictionaryIdLookup = new DictionaryIdLookup(); + } + + @Override + public void open() throws IOException + { + fieldsWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY); + fieldsWriter.open(); + + fieldsInfoWriter = new FieldTypeInfo.Writer(segmentWriteOutMedium); + fieldsInfoWriter.open(); + + dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter( + indexSpec.getStringDictionaryEncoding(), + segmentWriteOutMedium, + name + ); + dictionaryWriter.open(); + + longDictionaryWriter = new FixedIndexedWriter<>( + segmentWriteOutMedium, + ColumnType.LONG.getStrategy(), + ByteOrder.nativeOrder(), + Long.BYTES, + true + ); + longDictionaryWriter.open(); + + doubleDictionaryWriter = new FixedIndexedWriter<>( + segmentWriteOutMedium, + ColumnType.DOUBLE.getStrategy(), + ByteOrder.nativeOrder(), + Double.BYTES, + true + ); + doubleDictionaryWriter.open(); + + rawWriter = new CompressedVariableSizedBlobColumnSerializer( + getInternalFileName(name, RAW_FILE_NAME), + segmentWriteOutMedium, + indexSpec.getJsonCompression() != null ? indexSpec.getJsonCompression() : CompressionStrategy.LZ4 + ); + rawWriter.open(); + + nullBitmapWriter = new ByteBufferWriter<>( + segmentWriteOutMedium, + indexSpec.getBitmapSerdeFactory().getObjectStrategy() + ); + nullBitmapWriter.open(); + + nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap(); + } + + public void serializeFields(SortedMap fields) throws IOException + { + this.fields = fields; + this.fieldWriters = Maps.newHashMapWithExpectedSize(fields.size()); + int ctr = 0; + for (Map.Entry field : fields.entrySet()) { + final String fieldName = field.getKey(); + final String fieldFileName = NESTED_FIELD_PREFIX + ctr++; + fieldsWriter.write(fieldName); + fieldsInfoWriter.write(field.getValue()); + final GlobalDictionaryEncodedFieldColumnWriter writer; + final ColumnType type = field.getValue().getSingleType(); + if (type != null) { + if (Types.is(type, ValueType.STRING)) { + writer = new ScalarStringFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } else if (Types.is(type, ValueType.LONG)) { + writer = new ScalarLongFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } else if (Types.is(type, ValueType.DOUBLE)) { + writer = new ScalarDoubleFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } else { + throw new ISE("Invalid field type [%s], how did this happen?", type); + } + } else { + writer = new VariantFieldColumnWriter( + name, + fieldFileName, + segmentWriteOutMedium, + indexSpec, + globalDictionaryIdLookup + ); + } + writer.open(); + fieldWriters.put(fieldName, writer); + } + } + + public void serializeDictionaries( + Iterable strings, + Iterable longs, + Iterable doubles + ) throws IOException + { + if (dictionarySerialized) { + throw new ISE("String dictionary already serialized for column [%s], cannot serialize again", name); + } + + // null is always 0 + dictionaryWriter.write(null); + globalDictionaryIdLookup.addString(null); + for (String value : strings) { + value = NullHandling.emptyToNullIfNeeded(value); + if (value == null) { + continue; + } + + dictionaryWriter.write(value); + globalDictionaryIdLookup.addString(value); + } + dictionarySerialized = true; + + for (Long value : longs) { + if (value == null) { + continue; + } + longDictionaryWriter.write(value); + globalDictionaryIdLookup.addLong(value); + } + + for (Double value : doubles) { + if (value == null) { + continue; + } + doubleDictionaryWriter.write(value); + globalDictionaryIdLookup.addDouble(value); + } + dictionarySerialized = true; + } + + @Override + public void serialize(ColumnValueSelector selector) throws IOException + { + final StructuredData data = StructuredData.wrap(selector.getObject()); + if (data == null) { + nullRowsBitmap.add(rowCount); + } + rawWriter.addValue(NestedDataComplexTypeSerde.INSTANCE.toBytes(data)); + if (data != null) { + fieldProcessor.processFields(data.getValue()); + } + rowCount++; + } + + @Override + public long getSerializedSize() throws IOException + { + if (!closedForWrite) { + closedForWrite = true; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IndexMerger.SERIALIZER_UTILS.writeString( + baos, + NestedDataComplexTypeSerde.OBJECT_MAPPER.writeValueAsString( + new NestedDataColumnMetadata( + ByteOrder.nativeOrder(), + indexSpec.getBitmapSerdeFactory(), + name, + !nullRowsBitmap.isEmpty() + ) + ) + ); + this.metadataBytes = baos.toByteArray(); + this.nullBitmapWriter.write(nullRowsBitmap); + } + + long size = 1; + size += metadataBytes.length; + if (fieldsWriter != null) { + size += fieldsWriter.getSerializedSize(); + } + if (fieldsInfoWriter != null) { + size += fieldsInfoWriter.getSerializedSize(); + } + // the value dictionaries, raw column, and null index are all stored in separate files + return size; + } + + @Override + public void writeTo( + WritableByteChannel channel, + FileSmoosher smoosher + ) throws IOException + { + Preconditions.checkState(closedForWrite, "Not closed yet!"); + Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?"); + // version 5 + channel.write(ByteBuffer.wrap(new byte[]{0x04})); + channel.write(ByteBuffer.wrap(metadataBytes)); + fieldsWriter.writeTo(channel, smoosher); + fieldsInfoWriter.writeTo(channel, smoosher); + + // version 3 stores large components in separate files to prevent exceeding smoosh file limit (int max) + writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME); + writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME); + writeInternal(smoosher, doubleDictionaryWriter, DOUBLE_DICTIONARY_FILE_NAME); + writeInternal(smoosher, rawWriter, RAW_FILE_NAME); + if (!nullRowsBitmap.isEmpty()) { + writeInternal(smoosher, nullBitmapWriter, NULL_BITMAP_FILE_NAME); + } + + + // close the SmooshedWriter since we are done here, so we don't write to a temporary file per sub-column + // In the future, it would be best if the writeTo() itself didn't take a channel but was expected to actually + // open its own channels on the FileSmoosher object itself. Or some other thing that give this Serializer + // total control over when resources are opened up and when they are closed. Until then, we are stuck + // with a very tight coupling of this code with how the external "driver" is working. + if (channel instanceof SmooshedWriter) { + channel.close(); + } + + for (Map.Entry field : fields.entrySet()) { + // remove writer so that it can be collected when we are done with it + GlobalDictionaryEncodedFieldColumnWriter writer = fieldWriters.remove(field.getKey()); + writer.writeTo(rowCount, smoosher); + } + log.info("Column [%s] serialized successfully with [%d] nested columns.", name, fields.size()); + } + + private void writeInternal(FileSmoosher smoosher, Serializer serializer, String fileName) throws IOException + { + final String internalName = getInternalFileName(name, fileName); + try (SmooshedWriter smooshChannel = smoosher.addWithSmooshedWriter(internalName, serializer.getSerializedSize())) { + serializer.writeTo(smooshChannel, smoosher); + } + } + + public static String getInternalFileName(String fileNameBase, String field) + { + return StringUtils.format("%s.%s", fileNameBase, field); + } +} diff --git a/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java b/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java new file mode 100644 index 000000000000..59b5b2df4158 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/guice/NestedDataModuleTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.segment.DefaultColumnFormatConfig; +import org.apache.druid.segment.DimensionHandlerProvider; +import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.NestedCommonFormatColumnHandler; +import org.apache.druid.segment.NestedDataColumnHandlerV4; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Properties; + +public class NestedDataModuleTest +{ + @Nullable + private static DimensionHandlerProvider DEFAULT_HANDLER_PROVIDER; + + @BeforeClass + public static void setup() + { + DEFAULT_HANDLER_PROVIDER = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( + NestedDataComplexTypeSerde.TYPE_NAME + ); + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + } + @AfterClass + public static void teardown() + { + if (DEFAULT_HANDLER_PROVIDER == null) { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + } else { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.put( + NestedDataComplexTypeSerde.TYPE_NAME, + DEFAULT_HANDLER_PROVIDER + ); + } + } + + @Test + public void testDefaults() + { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + Properties props = new Properties(); + Injector gadget = makeInjector(props); + + // side effects + gadget.getInstance(NestedDataModule.SideEffectHandlerRegisterer.class); + + DimensionHandlerProvider provider = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( + NestedDataComplexTypeSerde.TYPE_NAME + ); + Assert.assertTrue(provider.get("test") instanceof NestedCommonFormatColumnHandler); + } + + @Test + public void testOverride() + { + DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.remove(NestedDataComplexTypeSerde.TYPE_NAME); + Properties props = new Properties(); + props.put("druid.indexing.formats.nestedColumnFormatVersion", "4"); + Injector gadget = makeInjector(props); + + // side effects + gadget.getInstance(NestedDataModule.SideEffectHandlerRegisterer.class); + + DimensionHandlerProvider provider = DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.get( + NestedDataComplexTypeSerde.TYPE_NAME + ); + Assert.assertTrue(provider.get("test") instanceof NestedDataColumnHandlerV4); + } + + private Injector makeInjector(Properties props) + { + + StartupInjectorBuilder bob = new StartupInjectorBuilder().forTests().withProperties(props); + + bob.addAll( + ImmutableList.of( + binder -> { + JsonConfigProvider.bind(binder, "druid.indexing.formats", DefaultColumnFormatConfig.class); + }, + new NestedDataModule() + ) + ); + + return bob.build(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java index 2084af4b6a91..e5de63437358 100644 --- a/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java +++ b/processing/src/test/java/org/apache/druid/query/NestedDataTestUtils.java @@ -44,6 +44,7 @@ import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.NestedDataColumnSchema; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; @@ -113,6 +114,20 @@ public class NestedDataTestUtils ) ) .build(); + + public static final DimensionsSpec TSV_V4_SCHEMA = + DimensionsSpec.builder() + .setDimensions( + Arrays.asList( + new NestedDataColumnSchema("dim", 4), + new NestedDataColumnSchema("nest_json", 4), + new NestedDataColumnSchema("nester_json", 4), + new NestedDataColumnSchema("variant_json", 4), + new NestedDataColumnSchema("list_json", 4), + new NestedDataColumnSchema("nonexistent", 4) + ) + ) + .build(); public static final InputRowSchema AUTO_SCHEMA = new InputRowSchema( TIMESTAMP_SPEC, AUTO_DISCOVERY, @@ -169,6 +184,21 @@ public static List createSimpleSegmentsTsv( ); } + public static List createSimpleSegmentsTsvV4( + TemporaryFolder tempFolder, + Closer closer + ) + throws Exception + { + return createSimpleNestedTestDataTsvSegments( + tempFolder, + closer, + Granularities.NONE, + TSV_V4_SCHEMA, + true + ); + } + public static List createSimpleNestedTestDataTsvSegments( TemporaryFolder tempFolder, Closer closer, diff --git a/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java index d2666ee32ff8..0d91e7d5e001 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java @@ -265,6 +265,36 @@ public void testIngestAndScanSegmentsRealtimeWithFallback() throws Exception Assert.assertEquals(resultsSegments.get(0).getEvents().toString(), resultsRealtime.get(0).getEvents().toString()); } + @Test + public void testIngestAndScanSegmentsTsvV4() throws Exception + { + Query scanQuery = Druids.newScanQueryBuilder() + .dataSource("test_datasource") + .intervals( + new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.ETERNITY) + ) + ) + .virtualColumns( + new NestedFieldVirtualColumn("nest", "$.x", "x"), + new NestedFieldVirtualColumn("nester", "$.x[0]", "x_0"), + new NestedFieldVirtualColumn("nester", "$.y.c[1]", "y_c_1") + ) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .limit(100) + .context(ImmutableMap.of()) + .build(); + List segs = NestedDataTestUtils.createSimpleSegmentsTsvV4(tempFolder, closer); + + final Sequence seq = helper.runQueryOnSegmentsObjs(segs, scanQuery); + + List results = seq.toList(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals(8, ((List) results.get(0).getEvents()).size()); + logResults(results); + } + + @Test public void testIngestAndScanSegmentsTsv() throws Exception { diff --git a/processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java b/processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java new file mode 100644 index 000000000000..f259fc1dc237 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/DefaultColumnFormatsConfigTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class DefaultColumnFormatsConfigTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testDefaultsSerde() throws JsonProcessingException + { + DefaultColumnFormatConfig defaultColumnFormatConfig = new DefaultColumnFormatConfig(null); + String there = MAPPER.writeValueAsString(defaultColumnFormatConfig); + DefaultColumnFormatConfig andBack = MAPPER.readValue(there, DefaultColumnFormatConfig.class); + Assert.assertEquals(defaultColumnFormatConfig, andBack); + Assert.assertNull(andBack.getNestedColumnFormatVersion()); + } + + @Test + public void testDefaultsSerdeOverride() throws JsonProcessingException + { + DefaultColumnFormatConfig defaultColumnFormatConfig = new DefaultColumnFormatConfig(4); + String there = MAPPER.writeValueAsString(defaultColumnFormatConfig); + DefaultColumnFormatConfig andBack = MAPPER.readValue(there, DefaultColumnFormatConfig.class); + Assert.assertEquals(defaultColumnFormatConfig, andBack); + Assert.assertEquals(4, (int) andBack.getNestedColumnFormatVersion()); + } + + @Test + public void testEqualsAndHashcode() + { + EqualsVerifier.forClass(DefaultColumnFormatConfig.class).usingGetClass().verify(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java new file mode 100644 index 000000000000..b43616b4a4a8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerV4Test.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.druid.segment.nested.StructuredData; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.List; +import java.util.Map; + +public class NestedDataColumnIndexerV4Test extends InitializedNullHandlingTest +{ + private static final String TIME_COL = "time"; + private static final String STRING_COL = "string"; + private static final String STRING_ARRAY_COL = "string_array"; + private static final String LONG_COL = "long"; + private static final String DOUBLE_COL = "double"; + private static final String VARIANT_COL = "variant"; + private static final String NESTED_COL = "nested"; + + @BeforeClass + public static void setup() + { + NestedDataModule.registerHandlersAndSerde(); + } + + @Test + public void testKeySizeEstimation() + { + NestedDataColumnIndexerV4 indexer = new NestedDataColumnIndexerV4(); + int baseCardinality = NullHandling.sqlCompatible() ? 0 : 2; + Assert.assertEquals(baseCardinality, indexer.getCardinality()); + + EncodedKeyComponent key; + // new raw value, new field, new dictionary entry + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false); + Assert.assertEquals(228, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 1, indexer.getCardinality()); + // adding same value only adds estimated size of value itself + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableMap.of("x", "foo"), false); + Assert.assertEquals(112, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 1, indexer.getCardinality()); + // new raw value, new field, new dictionary entry + key = indexer.processRowValsToUnsortedEncodedKeyComponent(10L, false); + Assert.assertEquals(94, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 2, indexer.getCardinality()); + // adding same value only adds estimated size of value itself + key = indexer.processRowValsToUnsortedEncodedKeyComponent(10L, false); + Assert.assertEquals(16, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 2, indexer.getCardinality()); + // new raw value, new dictionary entry + key = indexer.processRowValsToUnsortedEncodedKeyComponent(11L, false); + Assert.assertEquals(48, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 3, indexer.getCardinality()); + + // new raw value, new fields + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 10L), false); + Assert.assertEquals(276, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + // new raw value, re-use fields and dictionary + key = indexer.processRowValsToUnsortedEncodedKeyComponent(ImmutableList.of(1L, 2L, 10L), false); + Assert.assertEquals(56, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + // new raw value, new fields + key = indexer.processRowValsToUnsortedEncodedKeyComponent( + ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), + false + ); + Assert.assertEquals(286, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + // new raw value + key = indexer.processRowValsToUnsortedEncodedKeyComponent( + ImmutableMap.of("x", ImmutableList.of(1L, 2L, 10L)), + false + ); + Assert.assertEquals(118, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 5, indexer.getCardinality()); + + key = indexer.processRowValsToUnsortedEncodedKeyComponent("", false); + if (NullHandling.replaceWithDefault()) { + Assert.assertEquals(0, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 6, indexer.getCardinality()); + } else { + Assert.assertEquals(104, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 6, indexer.getCardinality()); + } + + key = indexer.processRowValsToUnsortedEncodedKeyComponent(0, false); + if (NullHandling.replaceWithDefault()) { + Assert.assertEquals(16, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 6, indexer.getCardinality()); + } else { + Assert.assertEquals(48, key.getEffectiveSizeBytes()); + Assert.assertEquals(baseCardinality + 7, indexer.getCardinality()); + } + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootString() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, STRING_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, STRING_COL, "b")); + index.add(makeInputRow(minTimestamp + 3, true, STRING_COL, "c")); + index.add(makeInputRow(minTimestamp + 4, true, STRING_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, STRING_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_COL, STRING_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("a", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("a", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("a", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("b", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("b", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("b", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals("c", valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("c", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("c", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootLong() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, LONG_COL, 1L)); + index.add(makeInputRow(minTimestamp + 2, true, LONG_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, LONG_COL, 3L)); + index.add(makeInputRow(minTimestamp + 4, true, LONG_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, LONG_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(LONG_COL, LONG_COL, ColumnType.LONG); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(1L, valueSelector.getObject()); + Assert.assertEquals(1L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("1", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2L, valueSelector.getObject()); + Assert.assertEquals(2L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3L, valueSelector.getObject()); + Assert.assertEquals(3L, valueSelector.getLong()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultLongValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultLongValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), dimensionSelector.getObject()); + } + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(LONG_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultLongValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultLongValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultLongValue()), dimensionSelector.getObject()); + } + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootDouble() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, DOUBLE_COL, 1.1)); + index.add(makeInputRow(minTimestamp + 2, true, DOUBLE_COL, 2.2)); + index.add(makeInputRow(minTimestamp + 3, true, DOUBLE_COL, 3.3)); + index.add(makeInputRow(minTimestamp + 4, true, DOUBLE_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, DOUBLE_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(DOUBLE_COL, DOUBLE_COL, ColumnType.DOUBLE); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(1.1, valueSelector.getObject()); + Assert.assertEquals(1.1, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("1.1", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("1.1", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2.2, valueSelector.getObject()); + Assert.assertEquals(2.2, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("2.2", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("2.2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3.3, valueSelector.getObject()); + Assert.assertEquals(3.3, valueSelector.getDouble(), 0.0); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals("3.3", dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertEquals("3.3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultDoubleValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultDoubleValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), dimensionSelector.getObject()); + } + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(DOUBLE_COL); + dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimensionSpec); + if (NullHandling.sqlCompatible()) { + Assert.assertNull(valueSelector.getObject()); + Assert.assertTrue(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertNull(dimensionSelector.lookupName(dimensionSelector.getRow().get(0))); + Assert.assertNull(dimensionSelector.getObject()); + } else { + Assert.assertEquals(NullHandling.defaultDoubleValue(), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals(1, dimensionSelector.getRow().size()); + Assert.assertEquals( + String.valueOf(NullHandling.defaultDoubleValue()), + dimensionSelector.lookupName(dimensionSelector.getRow().get(0)) + ); + Assert.assertEquals(String.valueOf(NullHandling.defaultDoubleValue()), dimensionSelector.getObject()); + } + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootStringArray() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, STRING_ARRAY_COL, new String[]{"a"})); + index.add(makeInputRow(minTimestamp + 2, true, STRING_ARRAY_COL, new Object[]{"b", "c"})); + index.add(makeInputRow(minTimestamp + 3, true, STRING_ARRAY_COL, ImmutableList.of("d", "e"))); + index.add(makeInputRow(minTimestamp + 4, true, STRING_ARRAY_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, STRING_ARRAY_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(STRING_ARRAY_COL, STRING_ARRAY_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertArrayEquals(new Object[]{"a"}, (Object[]) valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertArrayEquals(new Object[]{"b", "c"}, (Object[]) valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertArrayEquals(new Object[]{"d", "e"}, (Object[]) valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(STRING_ARRAY_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryRootVariant() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, VARIANT_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, VARIANT_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, VARIANT_COL, 3.3)); + index.add(makeInputRow(minTimestamp + 4, true, VARIANT_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, VARIANT_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(VARIANT_COL, VARIANT_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + DimensionSelector dimensionSelector = cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertEquals("a", valueSelector.getObject()); + Assert.assertEquals("a", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertEquals(2L, valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals("2", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertEquals(3.3, valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + Assert.assertEquals("3.3", dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertNull(dimensionSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(VARIANT_COL); + dimensionSelector = cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec); + Assert.assertNull(valueSelector.getObject()); + Assert.assertNull(dimensionSelector.getObject()); + } + + @Test + public void testNestedColumnIndexerSchemaDiscoveryNested() throws IndexSizeExceededException + { + long minTimestamp = System.currentTimeMillis(); + IncrementalIndex index = makeIncrementalIndex(minTimestamp); + + index.add(makeInputRow(minTimestamp + 1, true, NESTED_COL, "a")); + index.add(makeInputRow(minTimestamp + 2, true, NESTED_COL, 2L)); + index.add(makeInputRow(minTimestamp + 3, true, NESTED_COL, ImmutableMap.of("x", 1.1, "y", 2L))); + index.add(makeInputRow(minTimestamp + 4, true, NESTED_COL, null)); + index.add(makeInputRow(minTimestamp + 5, false, NESTED_COL, null)); + + IncrementalIndexStorageAdapter storageAdapter = new IncrementalIndexStorageAdapter(index); + Sequence cursorSequence = storageAdapter.makeCursors( + null, + Intervals.ETERNITY, + VirtualColumns.EMPTY, + Granularities.NONE, + false, + null + ); + final DimensionSpec dimensionSpec = new DefaultDimensionSpec(NESTED_COL, NESTED_COL, ColumnType.STRING); + List cursorList = cursorSequence.toList(); + ColumnSelectorFactory columnSelectorFactory = cursorList.get(0).getColumnSelectorFactory(); + + ColumnValueSelector valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(0).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap("a"), valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(1).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(1).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(2L), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(2).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(2).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertEquals(StructuredData.wrap(ImmutableMap.of("x", 1.1, "y", 2L)), valueSelector.getObject()); + Assert.assertFalse(valueSelector.isNull()); + + columnSelectorFactory = cursorList.get(3).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(3).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + + columnSelectorFactory = cursorList.get(4).getColumnSelectorFactory(); + valueSelector = columnSelectorFactory.makeColumnValueSelector(NESTED_COL); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> cursorList.get(4).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec) + ); + Assert.assertNull(valueSelector.getObject()); + } + + @Nonnull + private static IncrementalIndex makeIncrementalIndex(long minTimestamp) + { + IncrementalIndex index = new OnheapIncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema( + minTimestamp, + new TimestampSpec(TIME_COL, "millis", null), + Granularities.NONE, + VirtualColumns.EMPTY, + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + new AggregatorFactory[0], + false + ) + ) + .setMaxRowCount(1000) + .build(); + return index; + } + + private MapBasedInputRow makeInputRow( + long timestamp, + boolean explicitNull, + Object... kv + ) + { + final Map event = TestHelper.makeMap(explicitNull, kv); + event.put("time", timestamp); + return new MapBasedInputRow(timestamp, ImmutableList.copyOf(event.keySet()), event); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java new file mode 100644 index 000000000000..8bfcfdefa4f4 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnSchemaTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class NestedDataColumnSchemaTest +{ + private static final DefaultColumnFormatConfig DEFAULT_CONFIG = new DefaultColumnFormatConfig(null); + private static final DefaultColumnFormatConfig DEFAULT_CONFIG_V4 = new DefaultColumnFormatConfig(4); + private static final ObjectMapper MAPPER; + private static final ObjectMapper MAPPER_V4; + + static { + MAPPER = new DefaultObjectMapper(); + MAPPER.setInjectableValues( + new InjectableValues.Std().addValue( + DefaultColumnFormatConfig.class, + DEFAULT_CONFIG + ) + ); + + MAPPER_V4 = new DefaultObjectMapper(); + MAPPER_V4.setInjectableValues( + new InjectableValues.Std().addValue( + DefaultColumnFormatConfig.class, + DEFAULT_CONFIG_V4 + ) + ); + } + + @Test + public void testSerdeRoundTrip() throws JsonProcessingException + { + final NestedDataColumnSchema v4 = new NestedDataColumnSchema("test", 4); + final NestedDataColumnSchema v5 = new NestedDataColumnSchema("test", 5); + Assert.assertEquals(v4, MAPPER.readValue(MAPPER.writeValueAsString(v4), NestedDataColumnSchema.class)); + Assert.assertEquals(v5, MAPPER.readValue(MAPPER.writeValueAsString(v5), NestedDataColumnSchema.class)); + } + + @Test + public void testSerdeDefault() throws JsonProcessingException + { + final String there = "{\"type\":\"json\", \"name\":\"test\"}"; + NestedDataColumnSchema andBack = MAPPER.readValue(there, NestedDataColumnSchema.class); + Assert.assertEquals(new NestedDataColumnSchema("test", 5), andBack); + } + + @Test + public void testSerdeSystemDefault() throws JsonProcessingException + { + final String there = "{\"type\":\"json\", \"name\":\"test\"}"; + NestedDataColumnSchema andBack = MAPPER_V4.readValue(there, NestedDataColumnSchema.class); + Assert.assertEquals(new NestedDataColumnSchema("test", 4), andBack); + } + + @Test + public void testSerdeOverride() throws JsonProcessingException + { + final String there = "{\"type\":\"json\", \"name\":\"test\",\"formatVersion\":4}"; + NestedDataColumnSchema andBack = MAPPER.readValue(there, NestedDataColumnSchema.class); + Assert.assertEquals(new NestedDataColumnSchema("test", 4), andBack); + } + + @Test + public void testVersionTooSmall() + { + Throwable t = Assert.assertThrows(DruidException.class, () -> new NestedDataColumnSchema("test", 3)); + Assert.assertEquals("Unsupported nested column format version[3]", t.getMessage()); + } + + @Test + public void testVersionTooBig() + { + Throwable t = Assert.assertThrows(DruidException.class, () -> new NestedDataColumnSchema("test", 6)); + Assert.assertEquals("Unsupported nested column format version[6]", t.getMessage()); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java index 3f5a6ef337f4..cb1a8270af7a 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java @@ -22,28 +22,49 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.NestedDataModule; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.io.smoosh.FileSmoosher; +import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter; import org.apache.druid.query.DefaultBitmapResultFactory; import org.apache.druid.query.filter.SelectorPredicateFactory; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.IndexableAdapter; +import org.apache.druid.segment.NestedDataColumnIndexerV4; +import org.apache.druid.segment.ObjectColumnSelector; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.TypeStrategy; import org.apache.druid.segment.index.BitmapColumnIndex; import org.apache.druid.segment.index.semantic.DruidPredicateIndexes; import org.apache.druid.segment.index.semantic.NullValueIndex; import org.apache.druid.segment.index.semantic.StringValueSetIndexes; +import org.apache.druid.segment.serde.ColumnPartSerde; +import org.apache.druid.segment.serde.ComplexColumnPartSerde; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.utils.CompressionUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -52,22 +73,26 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest { private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); - private static final String NO_MATCH = "no"; - @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - DefaultBitmapResultFactory resultFactory = new DefaultBitmapResultFactory(new RoaringBitmapFactory()); - List> data = ImmutableList.of( TestHelper.makeMap("x", 1L, "y", 1.0, "z", "a", "v", "100", "nullish", "notnull"), TestHelper.makeMap("y", 3.0, "z", "d", "v", 1000L, "nullish", null), @@ -77,20 +102,178 @@ public class NestedDataColumnSupplierV4Test extends InitializedNullHandlingTest TestHelper.makeMap("x", 4L, "y", 2.0, "z", "e", "v", 11111L, "nullish", null) ); + List> arrayTestData = ImmutableList.of( + TestHelper.makeMap("s", new Object[]{"a", "b", "c"}, "l", new Object[]{1L, 2L, 3L}, "d", new Object[]{1.1, 2.2}), + TestHelper.makeMap( + "s", + new Object[]{null, "b", "c"}, + "l", + new Object[]{1L, null, 3L}, + "d", + new Object[]{2.2, 2.2} + ), + TestHelper.makeMap( + "s", + new Object[]{"b", "c"}, + "l", + new Object[]{null, null}, + "d", + new Object[]{1.1, null, 2.2} + ), + TestHelper.makeMap("s", new Object[]{"a", "b", "c", "d"}, "l", new Object[]{4L, 2L, 3L}), + TestHelper.makeMap("s", new Object[]{"d", "b", "c", "a"}, "d", new Object[]{1.1, 2.2}), + TestHelper.makeMap("l", new Object[]{1L, 2L, 3L}, "d", new Object[]{3.1, 2.2, 1.9}) + ); + Closer closer = Closer.create(); + SmooshedFileMapper fileMapper; + + ByteBuffer baseBuffer; + + SmooshedFileMapper arrayFileMapper; + + ByteBuffer arrayBaseBuffer; + @BeforeClass public static void staticSetup() { NestedDataModule.registerHandlersAndSerde(); } + @Before + public void setup() throws IOException + { + final String fileNameBase = "test"; + final String arrayFileNameBase = "array"; + fileMapper = smooshify(fileNameBase, tempFolder.newFolder(), data); + baseBuffer = fileMapper.mapFile(fileNameBase); + arrayFileMapper = smooshify(arrayFileNameBase, tempFolder.newFolder(), arrayTestData); + arrayBaseBuffer = arrayFileMapper.mapFile(arrayFileNameBase); + } + + private SmooshedFileMapper smooshify( + String fileNameBase, + File tmpFile, + List> data + ) + throws IOException + { + SegmentWriteOutMediumFactory writeOutMediumFactory = TmpFileSegmentWriteOutMediumFactory.instance(); + try (final FileSmoosher smoosher = new FileSmoosher(tmpFile)) { + NestedDataColumnSerializerV4 serializer = new NestedDataColumnSerializerV4( + fileNameBase, + IndexSpec.DEFAULT, + writeOutMediumFactory.makeSegmentWriteOutMedium(tempFolder.newFolder()), + closer + ); + + NestedDataColumnIndexerV4 indexer = new NestedDataColumnIndexerV4(); + for (Object o : data) { + indexer.processRowValsToUnsortedEncodedKeyComponent(o, false); + } + SortedMap sortedFields = new TreeMap<>(); + + IndexableAdapter.NestedColumnMergable mergable = closer.register( + new IndexableAdapter.NestedColumnMergable( + indexer.getSortedValueLookups(), + indexer.getFieldTypeInfo(), + true, + false, + null + ) + ); + SortedValueDictionary globalDictionarySortedCollector = mergable.getValueDictionary(); + mergable.mergeFieldsInto(sortedFields); + + serializer.open(); + serializer.serializeFields(sortedFields); + serializer.serializeDictionaries( + globalDictionarySortedCollector.getSortedStrings(), + globalDictionarySortedCollector.getSortedLongs(), + globalDictionarySortedCollector.getSortedDoubles() + ); + + SettableSelector valueSelector = new SettableSelector(); + for (Object o : data) { + valueSelector.setObject(StructuredData.wrap(o)); + serializer.serialize(valueSelector); + } + + try (SmooshedWriter writer = smoosher.addWithSmooshedWriter(fileNameBase, serializer.getSerializedSize())) { + serializer.writeTo(writer, smoosher); + } + smoosher.close(); + return closer.register(SmooshedFileMapper.load(tmpFile)); + } + } + @After public void teardown() throws IOException { closer.close(); } + @Test + public void testBasicFunctionality() throws IOException + { + ColumnBuilder bob = new ColumnBuilder(); + bob.setFileMapper(fileMapper); + ComplexColumnPartSerde partSerde = ComplexColumnPartSerde.createDeserializer(NestedDataComplexTypeSerde.TYPE_NAME); + ColumnPartSerde.Deserializer deserializer = partSerde.getDeserializer(); + deserializer.read(baseBuffer, bob, NestedFieldColumnIndexSupplierTest.ALWAYS_USE_INDEXES); + final ColumnHolder holder = bob.build(); + final ColumnCapabilities capabilities = holder.getCapabilities(); + Assert.assertEquals(ColumnType.NESTED_DATA, capabilities.toColumnType()); + Assert.assertTrue(holder.getColumnFormat() instanceof NestedDataComplexTypeSerde.NestedColumnFormatV4); + try (NestedDataComplexColumn column = (NestedDataComplexColumn) holder.getColumn()) { + smokeTest(column); + } + } + + @Test + public void testConcurrency() throws ExecutionException, InterruptedException + { + // if this test ever starts being to be a flake, there might be thread safety issues + ColumnBuilder bob = new ColumnBuilder(); + bob.setFileMapper(fileMapper); + NestedDataColumnSupplierV4 supplier = NestedDataColumnSupplierV4.read( + baseBuffer, + bob, + NestedFieldColumnIndexSupplierTest.ALWAYS_USE_INDEXES, + NestedDataComplexTypeSerde.OBJECT_MAPPER + ); + final String expectedReason = "none"; + final AtomicReference failureReason = new AtomicReference<>(expectedReason); + + final int threads = 10; + ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d") + ); + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) { + smokeTest(column); + } + } + } + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + @Test public void testLegacyV3ReaderFormat() throws IOException { @@ -106,34 +289,27 @@ public void testLegacyV3ReaderFormat() throws IOException ColumnHolder holder = theIndex.getColumnHolder(columnName); Assert.assertNotNull(holder); Assert.assertEquals(ColumnType.NESTED_DATA, holder.getCapabilities().toColumnType()); - NestedDataColumnV3 v3 = closer.register((NestedDataColumnV3) holder.getColumn()); Assert.assertNotNull(v3); - List path = ImmutableList.of(new NestedPathField("lastName")); ColumnHolder nestedColumnHolder = v3.getColumnHolder(path); Assert.assertNotNull(nestedColumnHolder); Assert.assertEquals(ColumnType.STRING, nestedColumnHolder.getCapabilities().toColumnType()); NestedFieldDictionaryEncodedColumn nestedColumn = (NestedFieldDictionaryEncodedColumn) nestedColumnHolder.getColumn(); - Assert.assertNotNull(nestedColumn); - ColumnValueSelector selector = nestedColumn.makeColumnValueSelector( new SimpleAscendingOffset(theIndex.getNumRows()) ); - ColumnIndexSupplier indexSupplier = v3.getColumnIndexSupplier(path); Assert.assertNotNull(indexSupplier); StringValueSetIndexes valueSetIndex = indexSupplier.as(StringValueSetIndexes.class); Assert.assertNotNull(valueSetIndex); - BitmapColumnIndex indexForValue = valueSetIndex.forValue(firstValue); Assert.assertEquals(firstValue, selector.getObject()); Assert.assertTrue(indexForValue.computeBitmapResult(resultFactory).get(0)); } } - @Test public void testLegacyV4ReaderFormat() throws IOException { @@ -150,39 +326,31 @@ public void testLegacyV4ReaderFormat() throws IOException ColumnHolder holder = theIndex.getColumnHolder(columnName); Assert.assertNotNull(holder); Assert.assertEquals(ColumnType.NESTED_DATA, holder.getCapabilities().toColumnType()); - NestedDataColumnV4 v4 = closer.register((NestedDataColumnV4) holder.getColumn()); Assert.assertNotNull(v4); - List path = ImmutableList.of(new NestedPathField("lastName")); ColumnHolder nestedColumnHolder = v4.getColumnHolder(path); Assert.assertNotNull(nestedColumnHolder); Assert.assertEquals(ColumnType.STRING, nestedColumnHolder.getCapabilities().toColumnType()); NestedFieldDictionaryEncodedColumn nestedColumn = (NestedFieldDictionaryEncodedColumn) nestedColumnHolder.getColumn(); - Assert.assertNotNull(nestedColumn); - ColumnValueSelector selector = nestedColumn.makeColumnValueSelector( new SimpleAscendingOffset(theIndex.getNumRows()) ); - ColumnIndexSupplier indexSupplier = v4.getColumnIndexSupplier(path); Assert.assertNotNull(indexSupplier); StringValueSetIndexes valueSetIndex = indexSupplier.as(StringValueSetIndexes.class); Assert.assertNotNull(valueSetIndex); - BitmapColumnIndex indexForValue = valueSetIndex.forValue(firstValue); Assert.assertEquals(firstValue, selector.getObject()); Assert.assertTrue(indexForValue.computeBitmapResult(resultFactory).get(0)); } } - private void smokeTest(NestedDataComplexColumn column) throws IOException { SimpleAscendingOffset offset = new SimpleAscendingOffset(data.size()); ColumnValueSelector rawSelector = column.makeColumnValueSelector(offset); - final List xPath = NestedPathFinder.parseJsonPath("$.x"); Assert.assertEquals(ImmutableSet.of(ColumnType.LONG), column.getColumnTypes(xPath)); Assert.assertEquals(ColumnType.LONG, column.getColumnHolder(xPath).getCapabilities().toColumnType()); @@ -193,7 +361,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes xValueIndex = xIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes xPredicateIndex = xIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex xNulls = xIndexSupplier.as(NullValueIndex.class); - final List yPath = NestedPathFinder.parseJsonPath("$.y"); Assert.assertEquals(ImmutableSet.of(ColumnType.DOUBLE), column.getColumnTypes(yPath)); Assert.assertEquals(ColumnType.DOUBLE, column.getColumnHolder(yPath).getCapabilities().toColumnType()); @@ -204,7 +371,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes yValueIndex = yIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes yPredicateIndex = yIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex yNulls = yIndexSupplier.as(NullValueIndex.class); - final List zPath = NestedPathFinder.parseJsonPath("$.z"); Assert.assertEquals(ImmutableSet.of(ColumnType.STRING), column.getColumnTypes(zPath)); Assert.assertEquals(ColumnType.STRING, column.getColumnHolder(zPath).getCapabilities().toColumnType()); @@ -215,7 +381,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes zValueIndex = zIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes zPredicateIndex = zIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex zNulls = zIndexSupplier.as(NullValueIndex.class); - final List vPath = NestedPathFinder.parseJsonPath("$.v"); Assert.assertEquals( ImmutableSet.of(ColumnType.STRING, ColumnType.LONG, ColumnType.DOUBLE), @@ -229,7 +394,6 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes vValueIndex = vIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes vPredicateIndex = vIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex vNulls = vIndexSupplier.as(NullValueIndex.class); - final List nullishPath = NestedPathFinder.parseJsonPath("$.nullish"); Assert.assertEquals(ImmutableSet.of(ColumnType.STRING), column.getColumnTypes(nullishPath)); Assert.assertEquals(ColumnType.STRING, column.getColumnHolder(nullishPath).getCapabilities().toColumnType()); @@ -240,16 +404,13 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException StringValueSetIndexes nullishValueIndex = nullishIndexSupplier.as(StringValueSetIndexes.class); DruidPredicateIndexes nullishPredicateIndex = nullishIndexSupplier.as(DruidPredicateIndexes.class); NullValueIndex nullishNulls = nullishIndexSupplier.as(NullValueIndex.class); - Assert.assertEquals(ImmutableList.of(nullishPath, vPath, xPath, yPath, zPath), column.getNestedFields()); - for (int i = 0; i < data.size(); i++) { Map row = data.get(i); Assert.assertEquals( JSON_MAPPER.writeValueAsString(row), JSON_MAPPER.writeValueAsString(StructuredData.unwrap(rawSelector.getObject())) ); - testPath(row, i, "v", vSelector, vDimSelector, vValueIndex, vPredicateIndex, vNulls, null); testPath(row, i, "x", xSelector, xDimSelector, xValueIndex, xPredicateIndex, xNulls, ColumnType.LONG); testPath(row, i, "y", ySelector, yDimSelector, yValueIndex, yPredicateIndex, yNulls, ColumnType.DOUBLE); @@ -265,11 +426,9 @@ private void smokeTest(NestedDataComplexColumn column) throws IOException nullishNulls, ColumnType.STRING ); - offset.increment(); } } - private void testPath( Map row, int rowNumber, @@ -287,7 +446,6 @@ private void testPath( // to take the null checking path final boolean isStringAndNullEquivalent = inputValue instanceof String && NullHandling.isNullOrEquivalent((String) inputValue); - if (row.containsKey(path) && inputValue != null && !isStringAndNullEquivalent) { Assert.assertEquals(inputValue, valueSelector.getObject()); if (ColumnType.LONG.equals(singleType)) { @@ -297,13 +455,11 @@ private void testPath( Assert.assertEquals((double) inputValue, valueSelector.getDouble(), 0.0); Assert.assertFalse(path + " is not null", valueSelector.isNull()); } - final String theString = String.valueOf(inputValue); Assert.assertEquals(theString, dimSelector.getObject()); String dimSelectorLookupVal = dimSelector.lookupName(dimSelector.getRow().get(0)); Assert.assertEquals(theString, dimSelectorLookupVal); Assert.assertEquals(dimSelector.idLookup().lookupId(dimSelectorLookupVal), dimSelector.getRow().get(0)); - Assert.assertTrue(valueSetIndex.forValue(theString).computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertTrue(valueSetIndex.forSortedValues(new TreeSet<>(ImmutableSet.of(theString))) .computeBitmapResult(resultFactory) @@ -319,7 +475,6 @@ private void testPath( .computeBitmapResult(resultFactory) .get(rowNumber)); Assert.assertFalse(nullValueIndex.get().computeBitmapResult(resultFactory).get(rowNumber)); - Assert.assertTrue(dimSelector.makeValueMatcher(theString).matches()); Assert.assertFalse(dimSelector.makeValueMatcher(NO_MATCH).matches()); Assert.assertTrue(dimSelector.makeValueMatcher(x -> Objects.equals(x, theString)).matches()); @@ -327,28 +482,104 @@ private void testPath( } else { Assert.assertNull(valueSelector.getObject()); Assert.assertTrue(path, valueSelector.isNull()); - Assert.assertEquals(0, dimSelector.getRow().get(0)); Assert.assertNull(dimSelector.getObject()); Assert.assertNull(dimSelector.lookupName(dimSelector.getRow().get(0))); - Assert.assertTrue(valueSetIndex.forValue(null).computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertTrue(nullValueIndex.get().computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertTrue(predicateIndex.forPredicate(new SelectorPredicateFactory(null)) .computeBitmapResult(resultFactory) .get(rowNumber)); Assert.assertFalse(valueSetIndex.forValue(NO_MATCH).computeBitmapResult(resultFactory).get(rowNumber)); - - Assert.assertFalse(valueSetIndex.forValue(NO_MATCH).computeBitmapResult(resultFactory).get(rowNumber)); Assert.assertFalse(predicateIndex.forPredicate(new SelectorPredicateFactory(NO_MATCH)) .computeBitmapResult(resultFactory) .get(rowNumber)); - Assert.assertTrue(dimSelector.makeValueMatcher((String) null).matches()); Assert.assertFalse(dimSelector.makeValueMatcher(NO_MATCH).matches()); Assert.assertTrue(dimSelector.makeValueMatcher(x -> x == null).matches()); Assert.assertFalse(dimSelector.makeValueMatcher(x -> Objects.equals(x, NO_MATCH)).matches()); } } + + private static class SettableSelector extends ObjectColumnSelector + { + private StructuredData data; + + public void setObject(StructuredData o) + { + this.data = o; + } + + @Nullable + @Override + public StructuredData getObject() + { + return data; + } + + @Override + public Class classOfObject() + { + return StructuredData.class; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + } + + private static class OnlyPositionalReadsTypeStrategy implements TypeStrategy + { + private final TypeStrategy delegate; + + private OnlyPositionalReadsTypeStrategy(TypeStrategy delegate) + { + this.delegate = delegate; + } + + @Override + public int estimateSizeBytes(T value) + { + return delegate.estimateSizeBytes(value); + } + + @Override + public T read(ByteBuffer buffer) + { + throw new IllegalStateException("non-positional read"); + } + + @Override + public boolean readRetainsBufferReference() + { + return delegate.readRetainsBufferReference(); + } + + @Override + public int write(ByteBuffer buffer, T value, int maxSizeBytes) + { + return delegate.write(buffer, value, maxSizeBytes); + } + + @Override + public T read(ByteBuffer buffer, int offset) + { + return delegate.read(buffer, offset); + } + + @Override + public int write(ByteBuffer buffer, int offset, T value, int maxSizeBytes) + { + return delegate.write(buffer, offset, value, maxSizeBytes); + } + + @Override + public int compare(Object o1, Object o2) + { + return delegate.compare(o1, o2); + } + } } diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java index 3ae5d2a3586f..e788949f66b9 100644 --- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java @@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.segment.DefaultColumnFormatConfig; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.StorageLocation; @@ -55,6 +56,7 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); + JsonConfigProvider.bind(binder, "druid.indexing.formats", DefaultColumnFormatConfig.class); bindLocationSelectorStrategy(binder); binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null)); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class).in(LazySingleton.class); diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java index fb440311a6e0..868eec46fd05 100644 --- a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java +++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java @@ -19,10 +19,9 @@ package org.apache.druid.cli; -import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.name.Names; @@ -30,6 +29,8 @@ import org.apache.druid.collections.bitmap.ImmutableBitmap; import org.apache.druid.collections.bitmap.RoaringBitmapFactory; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.NestedDataModule; import org.apache.druid.guice.StartupInjectorBuilder; import org.apache.druid.guice.annotations.Json; @@ -40,15 +41,19 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; -import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.DefaultColumnFormatConfig; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; @@ -56,7 +61,6 @@ import org.apache.druid.segment.column.ColumnIndexSupplier; import org.apache.druid.segment.index.semantic.DictionaryEncodedStringValueIndex; import org.apache.druid.testing.InitializedNullHandlingTest; -import org.apache.druid.timeline.SegmentId; import org.junit.After; import org.junit.Assert; import org.junit.Rule; @@ -66,16 +70,13 @@ import org.mockito.Mockito; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Collections; import java.util.List; public class DumpSegmentTest extends InitializedNullHandlingTest { - private final AggregationTestHelper helper; private final Closer closer; @Rule @@ -84,11 +85,6 @@ public class DumpSegmentTest extends InitializedNullHandlingTest public DumpSegmentTest() { NestedDataModule.registerHandlersAndSerde(); - List mods = NestedDataModule.getJacksonModulesList(); - this.helper = AggregationTestHelper.createScanQueryAggregationTestHelper( - mods, - tempFolder - ); this.closer = Closer.create(); } @@ -159,9 +155,16 @@ public void testDumpNestedColumn() throws Exception Injector injector = Mockito.mock(Injector.class); ObjectMapper mapper = TestHelper.makeJsonMapper(); mapper.registerModules(NestedDataModule.getJacksonModulesList()); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(DefaultColumnFormatConfig.class, new DefaultColumnFormatConfig(null)) + ); Mockito.when(injector.getInstance(Key.get(ObjectMapper.class, Json.class))).thenReturn(mapper); + Mockito.when(injector.getInstance(DefaultColumnFormatConfig.class)).thenReturn(new DefaultColumnFormatConfig(null)); - List segments = createSegments(helper, tempFolder, closer); + List segments = createSegments(tempFolder, closer); QueryableIndex queryableIndex = segments.get(0).asQueryableIndex(); File outputFile = tempFolder.newFile(); @@ -192,8 +195,16 @@ public void testDumpNestedColumnPath() throws Exception Injector injector = Mockito.mock(Injector.class); ObjectMapper mapper = TestHelper.makeJsonMapper(); mapper.registerModules(NestedDataModule.getJacksonModulesList()); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(DefaultColumnFormatConfig.class, new DefaultColumnFormatConfig(null)) + ); Mockito.when(injector.getInstance(Key.get(ObjectMapper.class, Json.class))).thenReturn(mapper); - List segments = createSegments(helper, tempFolder, closer); + Mockito.when(injector.getInstance(DefaultColumnFormatConfig.class)).thenReturn(new DefaultColumnFormatConfig(null)); + + List segments = createSegments(tempFolder, closer); QueryableIndex queryableIndex = segments.get(0).asQueryableIndex(); File outputFile = tempFolder.newFile(); @@ -236,49 +247,24 @@ public void testGetModules() public static List createSegments( - AggregationTestHelper helper, TemporaryFolder tempFolder, Closer closer ) throws Exception { - File segmentDir = tempFolder.newFolder(); - File inputFile = readFileFromClasspath("nested-test-data.json"); - FileInputStream inputDataStream = new FileInputStream(inputFile); - String parserJson = readFileFromClasspathAsString("nested-test-parser.json"); - String aggJson = readFileFromClasspathAsString("nested-test-aggs.json"); - - helper.createIndex( - inputDataStream, - parserJson, - aggJson, - segmentDir, - 0, + return NestedDataTestUtils.createSegments( + tempFolder, + closer, + "nested-test-data.json", + NestedDataTestUtils.DEFAULT_JSON_INPUT_FORMAT, + new TimestampSpec("timestamp", null, null), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + null, + new AggregatorFactory[] { + new CountAggregatorFactory("count") + }, Granularities.HOUR, - 1000, - true - ); - - final List segments = Lists.transform( - ImmutableList.of(segmentDir), - dir -> { - try { - return closer.register(new QueryableIndexSegment(helper.getIndexIO().loadIndex(dir), SegmentId.dummy(""))); - } - catch (IOException ex) { - throw new RuntimeException(ex); - } - } + true, + IndexSpec.DEFAULT ); - - return segments; - } - public static File readFileFromClasspath(String fileName) - { - return new File(NestedDataTestUtils.class.getClassLoader().getResource(fileName).getFile()); - } - - public static String readFileFromClasspathAsString(String fileName) throws IOException - { - return com.google.common.io.Files.asCharSource(readFileFromClasspath(fileName), StandardCharsets.UTF_8).read(); } } diff --git a/services/src/test/resources/nested-test-aggs.json b/services/src/test/resources/nested-test-aggs.json deleted file mode 100644 index 6a330e6ccd90..000000000000 --- a/services/src/test/resources/nested-test-aggs.json +++ /dev/null @@ -1,6 +0,0 @@ -[ - { - "type": "count", - "name": "count" - } -] diff --git a/services/src/test/resources/nested-test-parser.json b/services/src/test/resources/nested-test-parser.json deleted file mode 100644 index 9ba5cd0c57bd..000000000000 --- a/services/src/test/resources/nested-test-parser.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "timestamp", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - { - "type": "json", - "name": "nest" - } - ], - "dimensionExclusions": [], - "spatialDimensions": [] - } - } -} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index 3467d71bb627..2895eaeb01e1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -39,6 +39,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.topn.TopNQueryConfig; +import org.apache.druid.segment.DefaultColumnFormatConfig; import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; @@ -485,6 +486,7 @@ public void configure(Binder binder) { binder.bind(DruidOperatorTable.class).in(LazySingleton.class); binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT); + binder.bind(DefaultColumnFormatConfig.class).toInstance(new DefaultColumnFormatConfig(null)); } @Provides