diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java index 3c4263ba9993..dc7d50afd4d4 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java +++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java @@ -34,6 +34,7 @@ public class InputRowSchema private final TimestampSpec timestampSpec; private final DimensionsSpec dimensionsSpec; private final ColumnsFilter columnsFilter; + /** * Set of metric names for further downstream processing by {@link InputSource}. * Empty set if no metric given. diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java index 6e412c30ad8a..6dfaa9ad3d0a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java @@ -49,7 +49,9 @@ public class DimensionsSpec private final Map dimensionSchemaMap; private final boolean includeAllDimensions; - public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null, false); + private final boolean useNestedColumnIndexerForSchemaDiscovery; + + public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null, false, null); public static List getDefaultSchemas(List dimNames) { @@ -78,7 +80,7 @@ public static Builder builder() public DimensionsSpec(List dimensions) { - this(dimensions, null, null, false); + this(dimensions, null, null, false, null); } @JsonCreator @@ -86,7 +88,8 @@ private DimensionsSpec( @JsonProperty("dimensions") List dimensions, @JsonProperty("dimensionExclusions") List dimensionExclusions, @Deprecated @JsonProperty("spatialDimensions") List spatialDimensions, - @JsonProperty("includeAllDimensions") boolean includeAllDimensions + @JsonProperty("includeAllDimensions") boolean includeAllDimensions, + @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") Boolean useNestedColumnIndexerForSchemaDiscovery ) { this.dimensions = dimensions == null @@ -115,6 +118,8 @@ private DimensionsSpec( dimensionSchemaMap.put(newSchema.getName(), newSchema); } this.includeAllDimensions = includeAllDimensions; + this.useNestedColumnIndexerForSchemaDiscovery = + useNestedColumnIndexerForSchemaDiscovery != null && useNestedColumnIndexerForSchemaDiscovery; } @JsonProperty @@ -135,6 +140,12 @@ public boolean isIncludeAllDimensions() return includeAllDimensions; } + @JsonProperty + public boolean useNestedColumnIndexerForSchemaDiscovery() + { + return useNestedColumnIndexerForSchemaDiscovery; + } + @Deprecated @JsonIgnore public List getSpatialDimensions() @@ -188,7 +199,13 @@ public boolean hasCustomDimensions() @PublicApi public DimensionsSpec withDimensions(List dims) { - return new DimensionsSpec(dims, ImmutableList.copyOf(dimensionExclusions), null, includeAllDimensions); + return new DimensionsSpec( + dims, + ImmutableList.copyOf(dimensionExclusions), + null, + includeAllDimensions, + useNestedColumnIndexerForSchemaDiscovery + ); } public DimensionsSpec withDimensionExclusions(Set dimExs) @@ -197,14 +214,21 @@ public DimensionsSpec withDimensionExclusions(Set dimExs) dimensions, ImmutableList.copyOf(Sets.union(dimensionExclusions, dimExs)), null, - includeAllDimensions + includeAllDimensions, + useNestedColumnIndexerForSchemaDiscovery ); } @Deprecated public DimensionsSpec withSpatialDimensions(List spatials) { - return new DimensionsSpec(dimensions, ImmutableList.copyOf(dimensionExclusions), spatials, includeAllDimensions); + return new DimensionsSpec( + dimensions, + ImmutableList.copyOf(dimensionExclusions), + spatials, + includeAllDimensions, + useNestedColumnIndexerForSchemaDiscovery + ); } private void verify(List spatialDimensions) @@ -243,6 +267,7 @@ public boolean equals(Object o) } DimensionsSpec that = (DimensionsSpec) o; return includeAllDimensions == that.includeAllDimensions + && useNestedColumnIndexerForSchemaDiscovery == that.useNestedColumnIndexerForSchemaDiscovery && Objects.equals(dimensions, that.dimensions) && Objects.equals(dimensionExclusions, that.dimensionExclusions); } @@ -250,7 +275,12 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(dimensions, dimensionExclusions, includeAllDimensions); + return Objects.hash( + dimensions, + dimensionExclusions, + includeAllDimensions, + useNestedColumnIndexerForSchemaDiscovery + ); } @Override @@ -260,6 +290,7 @@ public String toString() "dimensions=" + dimensions + ", dimensionExclusions=" + dimensionExclusions + ", includeAllDimensions=" + includeAllDimensions + + ", useNestedColumnIndexerForSchemaDiscovery=" + useNestedColumnIndexerForSchemaDiscovery + '}'; } @@ -270,6 +301,8 @@ public static final class Builder private List spatialDimensions; private boolean includeAllDimensions; + private boolean useNestedColumnIndexerForSchemaDiscovery; + public Builder setDimensions(List dimensions) { this.dimensions = dimensions; @@ -301,9 +334,21 @@ public Builder setIncludeAllDimensions(boolean includeAllDimensions) return this; } + public Builder setUseNestedColumnIndexerForSchemaDiscovery(boolean useNestedColumnIndexerForSchemaDiscovery) + { + this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; + return this; + } + public DimensionsSpec build() { - return new DimensionsSpec(dimensions, dimensionExclusions, spatialDimensions, includeAllDimensions); + return new DimensionsSpec( + dimensions, + dimensionExclusions, + spatialDimensions, + includeAllDimensions, + useNestedColumnIndexerForSchemaDiscovery + ); } } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java index 19c0d685a97b..aa5ecbb08674 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java @@ -59,7 +59,13 @@ public class JsonLineReader extends TextReader ) { super(inputRowSchema, source); - this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns)); + this.flattener = ObjectFlatteners.create( + flattenSpec, + new JSONFlattenerMaker( + keepNullColumns, + inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + ) + ); this.mapper = mapper; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java index 7a7b98c4529a..eaf42244cb32 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java @@ -80,7 +80,13 @@ public class JsonNodeReader extends IntermediateRowParsingReader { this.inputRowSchema = inputRowSchema; this.source = source; - this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns)); + this.flattener = ObjectFlatteners.create( + flattenSpec, + new JSONFlattenerMaker( + keepNullColumns, + inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + ) + ); this.mapper = mapper; this.jsonFactory = new JsonFactory(); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java index 8dee12dc30f6..1b1402f22a12 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java @@ -77,7 +77,13 @@ public class JsonReader extends IntermediateRowParsingReader { this.inputRowSchema = inputRowSchema; this.source = source; - this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns)); + this.flattener = ObjectFlatteners.create( + flattenSpec, + new JSONFlattenerMaker( + keepNullColumns, + inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + ) + ); this.mapper = mapper; this.jsonFactory = new JsonFactory(); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java index b54b51f86348..0b8244e29b75 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java @@ -57,15 +57,23 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker discoverRootFields(final JsonNode obj) { + // if discovering nested fields, just return all root fields since we want everything + // else, we filter for literals and arrays of literals + if (discoverNestedFields) { + return obj::fieldNames; + } return FluentIterable.from(obj::fields) .filter( entry -> { diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java index 48777a8311a1..281d8442cc22 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java @@ -42,7 +42,10 @@ public class JSONPathParser implements Parser public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper, boolean keepNullColumns) { this.mapper = mapper == null ? new ObjectMapper() : mapper; - this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns)); + this.flattener = ObjectFlatteners.create( + flattenSpec, + new JSONFlattenerMaker(keepNullColumns, false) + ); } @Override diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index 98f087333a55..b2040d73db21 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -218,7 +218,9 @@ public interface FlattenerMaker { JsonProvider getJsonProvider(); /** - * List all "root" primitive properties and primitive lists (no nested objects, no lists of objects) + * List all "root" fields. If + * {@link org.apache.druid.data.input.impl.DimensionsSpec#useNestedColumnIndexerForSchemaDiscovery} is false, this + * method should filter fields to include only fields that contain primitive and lists of primitive values */ Iterable discoverRootFields(T obj); diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java index 0a1b09578bed..5583081db10e 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMakerTest.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.node.BinaryNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; import org.junit.Test; @@ -37,7 +38,8 @@ public class JSONFlattenerMakerTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final JSONFlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true); + private static final JSONFlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true, false); + private static final JSONFlattenerMaker FLATTENER_MAKER_NESTED = new JSONFlattenerMaker(true, true); @Test public void testStrings() throws JsonProcessingException @@ -169,4 +171,32 @@ public void testNested() throws JsonProcessingException result = FLATTENER_MAKER.finalizeConversionForMap(node); Assert.assertEquals(expectedList, result); } + + @Test + public void testDiscovery() throws JsonProcessingException + { + Map theMap = + ImmutableMap.builder() + .put("bool", true) + .put("int", 1) + .put("long", 1L) + .put("float", 0.11f) + .put("double", 0.33) + .put("binary", new byte[]{0x01, 0x02, 0x03}) + .put("list", ImmutableList.of("foo", "bar", "baz")) + .put("anotherList", ImmutableList.of(1, 2, 3)) + .put("nested", ImmutableMap.of("x", 1L, "y", 2L, "z", 3L)) + .build(); + + JsonNode node = OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(theMap)); + Assert.assertTrue(node.isObject()); + Assert.assertEquals( + ImmutableSet.of("bool", "int", "long", "float", "double", "binary", "list", "anotherList"), + ImmutableSet.copyOf(FLATTENER_MAKER.discoverRootFields(node)) + ); + Assert.assertEquals( + ImmutableSet.of("bool", "int", "long", "float", "double", "binary", "list", "anotherList", "nested"), + ImmutableSet.copyOf(FLATTENER_MAKER_NESTED.discoverRootFields(node)) + ); + } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java index e0b0fbcc510c..ab6c50ef188d 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java @@ -34,7 +34,7 @@ public class ObjectFlattenersTest { private static final String SOME_JSON = "{\"foo\": null, \"bar\": 1}"; - private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true); + private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true, false); private static final ObjectFlattener FLATTENER = ObjectFlatteners.create( new JSONPathSpec( true, diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index 4e254c18eef2..ba9d895b1f9b 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -92,14 +92,27 @@ private static boolean isFieldPrimitive(Schema.Field field) private final boolean fromPigAvroStorage; private final boolean binaryAsString; + private final boolean discoverNestedFields; + /** - * @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage - * @param binaryAsString boolean to encode the byte[] as a string. + * @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage + * @param binaryAsString if true, treat byte[] as utf8 encoded values and coerce to strings, else leave as byte[] + * @param extractUnionsByType if true, unions will be extracted to separate nested fields for each type. See + * {@link GenericAvroJsonProvider#extractUnionTypes(Object)} for more details + * @param discoverNestedFields if true, {@link #discoverRootFields(GenericRecord)} will return the full set of + * fields, else this list will be filtered to contain only simple literals and arrays + * of simple literals */ - public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString, final boolean extractUnionsByType) + public AvroFlattenerMaker( + final boolean fromPigAvroStorage, + final boolean binaryAsString, + final boolean extractUnionsByType, + final boolean discoverNestedFields + ) { this.fromPigAvroStorage = fromPigAvroStorage; this.binaryAsString = binaryAsString; + this.discoverNestedFields = discoverNestedFields; this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType); this.jsonPathConfiguration = @@ -113,6 +126,11 @@ public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binary @Override public Set discoverRootFields(final GenericRecord obj) { + // if discovering nested fields, just return all root fields since we want everything + // else, we filter for literals and arrays of literals + if (discoverNestedFields) { + return obj.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toSet()); + } return obj.getSchema() .getFields() .stream() diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java index 66552f9eda05..3039fbb7c843 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java @@ -64,7 +64,15 @@ public class AvroOCFReader extends IntermediateRowParsingReader this.source = source; this.temporaryDirectory = temporaryDirectory; this.readerSchema = readerSchema; - this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType)); + this.recordFlattener = ObjectFlatteners.create( + flattenSpec, + new AvroFlattenerMaker( + false, + binaryAsString, + extractUnionsByType, + inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + ) + ); } @Override diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index 6d399a8dc679..12f8a20bab7b 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -21,6 +21,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -50,7 +51,17 @@ public static ObjectFlattener makeFlattener( flattenSpec = JSONPathSpec.DEFAULT; } - return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString, extractUnionsByType)); + final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec(); + + return ObjectFlatteners.create( + flattenSpec, + new AvroFlattenerMaker( + fromPigAvroStorage, + binaryAsString, + extractUnionsByType, + dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + ) + ); } public static List parseGenericRecord( diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java index 4ed6a8a2ab26..51bc733d4305 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java @@ -60,7 +60,15 @@ public class AvroStreamReader extends IntermediateRowParsingReader settableByteEntitySource = (SettableByteEntity) source; - InputRowSchema newInputRowSchema = new InputRowSchema(dummyTimestampSpec, inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter()); + InputRowSchema newInputRowSchema = new InputRowSchema( + dummyTimestampSpec, + inputRowSchema.getDimensionsSpec(), + inputRowSchema.getColumnsFilter(), + inputRowSchema.getMetricNames() + ); return new KafkaInputReader( inputRowSchema, settableByteEntitySource, diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java index 9fcefcba2a60..5ab8cda96848 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; @@ -55,7 +56,14 @@ public OrcHadoopInputRowParser( } else { flattenSpec = JSONPathSpec.DEFAULT; } - this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(this.binaryAsString)); + final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec(); + this.orcStructFlattener = ObjectFlatteners.create( + flattenSpec, + new OrcStructFlattenerMaker( + this.binaryAsString, + dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + ) + ); this.parser = new MapInputRowParser(parseSpec); } diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java index 5bcec8c80eb1..89ba536562e0 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java @@ -70,7 +70,13 @@ public class OrcReader extends IntermediateRowParsingReader this.inputRowSchema = inputRowSchema; this.source = source; this.temporaryDirectory = temporaryDirectory; - this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(binaryAsString)); + this.orcStructFlattener = ObjectFlatteners.create( + flattenSpec, + new OrcStructFlattenerMaker( + binaryAsString, + inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + ) + ); } @Override diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java index ee770d3e8e20..016ddb3f90c4 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java @@ -43,7 +43,9 @@ public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker< private final JsonProvider orcJsonProvider; private final OrcStructConverter converter; - OrcStructFlattenerMaker(boolean binaryAsString) + private final boolean discoverNestedFields; + + OrcStructFlattenerMaker(boolean binaryAsString, boolean disocverNestedFields) { this.converter = new OrcStructConverter(binaryAsString); this.orcJsonProvider = new OrcStructJsonProvider(converter); @@ -52,11 +54,17 @@ public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker< .mappingProvider(new NotImplementedMappingProvider()) .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) .build(); + this.discoverNestedFields = disocverNestedFields; } @Override public Iterable discoverRootFields(OrcStruct obj) { + // if discovering nested fields, just return all root fields since we want everything + // else, we filter for literals and arrays of literals + if (discoverNestedFields) { + return obj.getSchema().getFieldNames(); + } List fields = obj.getSchema().getFieldNames(); List children = obj.getSchema().getChildren(); List primitiveFields = new ArrayList<>(); diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java index 03083f61ebc8..cf8ba19d9b74 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java @@ -52,6 +52,7 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.List; public class OrcReaderTest extends InitializedNullHandlingTest { @@ -421,6 +422,109 @@ public void testNestedColumn() throws IOException } } + @Test + public void testNestedColumnSchemaless() throws IOException + { + final OrcInputFormat inputFormat = new OrcInputFormat( + new JSONPathSpec(true, ImmutableList.of()), + null, + new Configuration() + ); + final InputRowSchema schema = new InputRowSchema( + new TimestampSpec("ts", "millis", null), + DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + ColumnsFilter.all(), + null + ); + final FileEntity entity = new FileEntity(new File("example/orc-file-11-format.orc")); + + final InputEntityReader reader = inputFormat.createReader(schema, entity, temporaryFolder.newFolder()); + + List dims = ImmutableList.of( + "boolean1", + "byte1", + "short1", + "int1", + "long1", + "float1", + "double1", + "bytes1", + "string1", + "middle", + "list", + "map", + "ts", + "decimal1" + ); + try (CloseableIterator iterator = reader.read()) { + int actualRowCount = 0; + + // Check the first row + Assert.assertTrue(iterator.hasNext()); + InputRow row = iterator.next(); + + Assert.assertEquals(dims, row.getDimensions()); + actualRowCount++; + Assert.assertEquals( + ImmutableMap.of( + "list", + ImmutableList.of( + ImmutableMap.of("int1", 1, "string1", "bye"), + ImmutableMap.of("int1", 2, "string1", "sigh") + ) + ), + row.getRaw("middle") + ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("int1", 3, "string1", "good"), + ImmutableMap.of("int1", 4, "string1", "bad") + ), + row.getRaw("list") + ); + Assert.assertEquals( + ImmutableMap.of(), + row.getRaw("map") + ); + Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), row.getTimestamp()); + + while (iterator.hasNext()) { + actualRowCount++; + row = iterator.next(); + Assert.assertEquals(dims, row.getDimensions()); + } + + // Check the last row + Assert.assertEquals( + ImmutableMap.of( + "list", + ImmutableList.of( + ImmutableMap.of("int1", 1, "string1", "bye"), + ImmutableMap.of("int1", 2, "string1", "sigh") + ) + ), + row.getRaw("middle") + ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("int1", 100000000, "string1", "cat"), + ImmutableMap.of("int1", -100000, "string1", "in"), + ImmutableMap.of("int1", 1234, "string1", "hat") + ), + row.getRaw("list") + ); + Assert.assertEquals( + ImmutableMap.of( + "chani", ImmutableMap.of("int1", 5, "string1", "chani"), + "mauddib", ImmutableMap.of("int1", 1, "string1", "mauddib") + ), + row.getRaw("map") + ); + + Assert.assertEquals(7500, actualRowCount); + } + } + @Test public void testListMap() throws IOException { diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java index aced1106eefa..494f66251633 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -66,7 +66,13 @@ public class ParquetReader extends IntermediateRowParsingReader this.inputRowSchema = inputRowSchema; this.source = source; this.temporaryDirectory = temporaryDirectory; - this.flattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(binaryAsString)); + this.flattener = ObjectFlatteners.create( + flattenSpec, + new ParquetGroupFlattenerMaker( + binaryAsString, + inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + ) + ); } @Override diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index 796891ea8ef8..5c2dd2844f19 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -32,6 +32,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.avro.AvroFlattenerMaker; import org.apache.druid.data.input.avro.AvroParseSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -76,9 +77,16 @@ public ParquetAvroHadoopInputRowParser( flattenSpec = JSONPathSpec.DEFAULT; } + final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec(); + this.recordFlattener = ObjectFlatteners.create( flattenSpec, - new AvroFlattenerMaker(false, this.binaryAsString, this.extractUnionsByType) + new AvroFlattenerMaker( + false, + this.binaryAsString, + this.extractUnionsByType, + dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + ) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java index 772505546271..a243107cc238 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupFlattenerMaker.java @@ -41,7 +41,9 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak private final ParquetGroupConverter converter; private final JsonProvider parquetJsonProvider; - public ParquetGroupFlattenerMaker(boolean binaryAsString) + private final boolean discoverNestedFields; + + public ParquetGroupFlattenerMaker(boolean binaryAsString, boolean discoverNestedFields) { this.converter = new ParquetGroupConverter(binaryAsString); this.parquetJsonProvider = new ParquetGroupJsonProvider(converter); @@ -50,11 +52,17 @@ public ParquetGroupFlattenerMaker(boolean binaryAsString) .mappingProvider(new NotImplementedMappingProvider()) .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) .build(); + this.discoverNestedFields = discoverNestedFields; } @Override public Set discoverRootFields(Group obj) { + // if discovering nested fields, just return all root fields since we want everything + // else, we filter for literals and arrays of literals + if (discoverNestedFields) { + return obj.getType().getFields().stream().map(Type::getName).collect(Collectors.toSet()); + } return obj.getType() .getFields() .stream() diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java index 762bb709d1c5..f3e853968511 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; @@ -54,7 +55,14 @@ public ParquetHadoopInputRowParser( } else { flattenSpec = JSONPathSpec.DEFAULT; } - this.groupFlattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(this.binaryAsString)); + final DimensionsSpec dimensionsSpec = parseSpec.getDimensionsSpec(); + this.groupFlattener = ObjectFlatteners.create( + flattenSpec, + new ParquetGroupFlattenerMaker( + this.binaryAsString, + dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + ) + ); this.parser = new MapInputRowParser(parseSpec); } diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java index 387ff2cf9194..950039d06956 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java @@ -162,4 +162,77 @@ public void testNestedColumnTransformsNestedNullableListFile() throws IOExceptio Assert.assertEquals(1L, rows.get(0).getRaw("t_a2_1_b1")); Assert.assertEquals(1L, rows.get(0).getRaw("tt_a2_0_b1")); } + + @Test + public void testNestedColumnSchemalessNestedTestFileNoNested() throws IOException + { + final String file = "example/flattening/test_nested_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(false).build(), + ColumnsFilter.all(), + null + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of()); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(ImmutableList.of("dim1", "metric1", "timestamp"), rows.get(0).getDimensions()); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals(ImmutableList.of("d1v1"), rows.get(0).getDimension("dim1")); + Assert.assertEquals("d1v1", rows.get(0).getRaw("dim1")); + Assert.assertEquals(ImmutableList.of("1"), rows.get(0).getDimension("metric1")); + Assert.assertEquals(1, rows.get(0).getRaw("metric1")); + Assert.assertEquals(1, rows.get(0).getMetric("metric1")); + // can still read even if it doesn't get reported as a dimension + Assert.assertEquals( + ImmutableMap.of( + "listDim", ImmutableList.of("listDim1v1", "listDim1v2"), + "dim3", 1, + "dim2", "d2v1", + "metric2", 2 + ), + rows.get(0).getRaw("nestedData") + ); + } + + @Test + public void testNestedColumnSchemalessNestedTestFile() throws IOException + { + final String file = "example/flattening/test_nested_1.parquet"; + InputRowSchema schema = new InputRowSchema( + new TimestampSpec("timestamp", "auto", null), + DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + ColumnsFilter.all(), + null + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, ImmutableList.of()); + InputEntityReader reader = createReader( + file, + schema, + flattenSpec + ); + + List rows = readAllRows(reader); + Assert.assertEquals(ImmutableList.of("nestedData", "dim1", "metric1", "timestamp"), rows.get(0).getDimensions()); + Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString()); + Assert.assertEquals(ImmutableList.of("d1v1"), rows.get(0).getDimension("dim1")); + Assert.assertEquals("d1v1", rows.get(0).getRaw("dim1")); + Assert.assertEquals(ImmutableList.of("1"), rows.get(0).getDimension("metric1")); + Assert.assertEquals(1, rows.get(0).getRaw("metric1")); + Assert.assertEquals(1, rows.get(0).getMetric("metric1")); + Assert.assertEquals( + ImmutableMap.of( + "listDim", ImmutableList.of("listDim1v1", "listDim1v2"), + "dim3", 1, + "dim2", "d2v1", + "metric2", 2 + ), + rows.get(0).getRaw("nestedData") + ); + } } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java index a3314e42731d..a3d69226fd48 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java @@ -56,6 +56,13 @@ public class ProtobufFlattenerMaker implements ObjectFlatteners.FlattenerMaker discoverRootFields(Map obj) { - // in the future we can just return obj.keySet(), but for now this doesnt expect nested fields... + // if discovering nested fields, just return all root fields since we want everything + // else, we filter for literals and arrays of literals + if (discoverNestedFields) { + return obj.keySet(); + } Set rootFields = Sets.newHashSetWithExpectedSize(obj.keySet().size()); for (Map.Entry entry : obj.entrySet()) { if (entry.getValue() instanceof List || entry.getValue() instanceof Map) { diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java index 9bb50a247b3e..7c0881b69c82 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowSchema.java @@ -40,7 +40,12 @@ public class ProtobufInputRowSchema extends InputRowSchema { public ProtobufInputRowSchema(InputRowSchema inputRowSchema) { - super(new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()), inputRowSchema.getDimensionsSpec(), inputRowSchema.getColumnsFilter()); + super( + new ProtobufTimestampSpec(inputRowSchema.getTimestampSpec()), + inputRowSchema.getDimensionsSpec(), + inputRowSchema.getColumnsFilter(), + inputRowSchema.getMetricNames() + ); } static class ProtobufTimestampSpec extends TimestampSpec @@ -51,9 +56,9 @@ public ProtobufTimestampSpec(TimestampSpec timestampSpec) } /** - * Extracts the timestamp from the record. If the timestamp column is of complex type such as {@link Timestamp}, then the timestamp - * is first serialized to string via {@link JsonFormat}. Directly calling {@code toString()} on {@code Timestamp} - * returns an unparseable string. + * Extracts the timestamp from the record. If the timestamp column is of complex type such as {@link Timestamp}, + * then the timestamp is first serialized to string via {@link JsonFormat}. Directly calling {@code toString()} + * on {@code Timestamp} returns an unparseable string. */ @Override @Nullable diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java index 2dc6aa1f8caa..ac77a545dd7b 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -57,7 +57,10 @@ public class ProtobufReader extends IntermediateRowParsingReader ) { this.inputRowSchema = inputRowSchema; - this.recordFlattener = ObjectFlatteners.create(flattenSpec, new ProtobufFlattenerMaker()); + this.recordFlattener = ObjectFlatteners.create( + flattenSpec, + new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()) + ); this.source = source; this.protobufBytesDecoder = protobufBytesDecoder; } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 0b5d096449c0..9255e1e7ebd2 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -328,6 +328,77 @@ public void testParseNestedData() throws Exception } + @Test + public void testParseNestedDataSchemaless() throws Exception + { + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat( + JSONPathSpec.DEFAULT, + decoder + ); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime); + + final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); + + InputEntityReader reader = protobufInputFormat.createReader( + new InputRowSchema( + timestampSpec, + DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + null, + null + ), + entity, + null + ); + + TransformSpec transformSpec = new TransformSpec( + null, + Lists.newArrayList( + new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", TestExprMacroTable.INSTANCE) + ) + ); + TransformingInputEntityReader transformingReader = new TransformingInputEntityReader( + reader, + transformSpec.toTransformer() + ); + + + InputRow row = transformingReader.read().next(); + + Assert.assertEquals( + ImmutableList.of( + "someOtherId", + "bar", + "someIntColumn", + "isValid", + "foo", + "description", + "someLongColumn", + "someFloatColumn", + "eventType", + "id", + "someBytesColumn", + "timestamp" + ), + row.getDimensions() + ); + + Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo")); + Assert.assertEquals( + ImmutableList.of(ImmutableMap.of("bar", "bar0"), ImmutableMap.of("bar", "bar1")), + row.getRaw("bar") + ); + Assert.assertArrayEquals( + new byte[]{0x01, 0x02, 0x03, 0x04}, + (byte[]) row.getRaw("someBytesColumn") + ); + ProtobufInputRowParserTest.verifyNestedData(row, dateTime); + + } + @Test public void testParseNestedDataTransformsOnly() throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java index aaa2cd07898b..8824decc05fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java @@ -22,10 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputSource; import org.apache.druid.java.util.common.HumanReadableBytes; -import org.apache.druid.segment.indexing.DataSchema; import javax.annotation.Nullable; @@ -35,13 +32,9 @@ public class SamplerConfig private static final int MAX_NUM_ROWS = 5000; private static final int DEFAULT_TIMEOUT_MS = 10000; - - private final int numRows; private final int timeoutMs; - private final long maxBytesInMemory; - private final long maxClientResponseBytes; @JsonCreator @@ -92,13 +85,13 @@ public int getTimeoutMs() /** * Maximum number of bytes in memory that the {@link org.apache.druid.segment.incremental.IncrementalIndex} used by - * {@link InputSourceSampler#sample(InputSource, InputFormat, DataSchema, SamplerConfig)} will be allowed to - * accumulate before aborting sampling. Particularly useful for limiting footprint of sample operations as well as - * overall response size from sample requests. However, it is not directly correlated to response size since it - * also contains the "raw" input data, so actual responses will likely be at least twice the size of this value, - * depending on factors such as number of transforms, aggregations in the case of rollup, whether all columns - * of the input are present in the dimension spec, and so on. If it is preferred to control client response size, - * use {@link SamplerConfig#getMaxClientResponseBytes()} instead. + * {@link InputSourceSampler#sample(org.apache.druid.data.input.InputSource, org.apache.druid.data.input.InputFormat, org.apache.druid.segment.indexing.DataSchema, SamplerConfig}) + * will be allowed to accumulate before aborting sampling. Particularly useful for limiting footprint of sample + * operations as well as overall response size from sample requests. However, it is not directly correlated to + * response size since it also contains the "raw" input data, so actual responses will likely be at least twice the + * size of this value, depending on factors such as number of transforms, aggregations in the case of rollup, whether + * all columns of the input are present in the dimension spec, and so on. If it is preferred to control client + * response size, use {@link SamplerConfig#getMaxClientResponseBytes()} instead. */ public long getMaxBytesInMemory() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index b3db2b71d1b9..c6b6d6fdf0a4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -252,7 +252,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException new ParallelIndexTuningConfig( null, null, - new OnheapIncrementalIndex.Spec(true, false), + new OnheapIncrementalIndex.Spec(true), 40000, 2000L, null, @@ -316,7 +316,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException ), new ClientCompactionTaskQueryTuningConfig( 100, - new OnheapIncrementalIndex.Spec(true, false), + new OnheapIncrementalIndex.Spec(true), 40000, 2000L, 30000L, diff --git a/integration-tests-ex/cases/cluster/Common/dependencies.yaml b/integration-tests-ex/cases/cluster/Common/dependencies.yaml index ccfce630a2a5..4eeb85a49cd6 100644 --- a/integration-tests-ex/cases/cluster/Common/dependencies.yaml +++ b/integration-tests-ex/cases/cluster/Common/dependencies.yaml @@ -67,7 +67,8 @@ services: # See https://hub.docker.com/_/mysql # The image will intialize the user and DB upon first start. metadata: - # platform: linux/x86_64 - Add when running on M1 Macs + # Uncomment the following when running on M1 Macs: + # platform: linux/x86_64 image: mysql:$MYSQL_IMAGE_VERSION container_name: metadata command: diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java index 86e0ece8d525..58e47bbe7eb4 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/MetastoreClient.java @@ -26,7 +26,6 @@ import org.skife.jdbi.v2.Handle; import javax.inject.Inject; - import java.sql.Connection; import java.sql.SQLException; diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java index c14ea745aa81..e23c996c8844 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/IntegrationTestingConfigEx.java @@ -25,7 +25,6 @@ import org.apache.druid.testing.IntegrationTestingConfigProvider; import javax.inject.Inject; - import java.util.Map; import java.util.Properties; diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java index 144722eea9b8..7b983b9cf1e4 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java @@ -155,14 +155,23 @@ public String getStringFromFileAndReplaceDatasource(String filePath, String data return fileString; } + protected void doTestQuery(String dataSource, String queryFilePath) + { + doTestQuery(dataSource, queryFilePath, false); + } + /** * Reads native queries from a file and runs against the provided datasource. */ - protected void doTestQuery(String dataSource, String queryFilePath) + protected void doTestQuery(String dataSource, String queryFilePath, boolean isSql) { try { String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource); - queryHelper.testQueriesFromString(query); + if (isSql) { + sqlQueryHelper.testQueriesFromString(query); + } else { + queryHelper.testQueriesFromString(query); + } } catch (Exception e) { LOG.error(e, "Error while running test query at path " + queryFilePath); @@ -255,6 +264,31 @@ protected void doIndexTest( queryFilePath, waitForNewVersion, runTestQueries, + false, + waitForSegmentsToLoad, + segmentAvailabilityConfirmationPair + ); + } + + protected void doIndexTest( + String dataSource, + String indexTaskFilePath, + Function taskSpecTransform, + String queryFilePath, + boolean waitForNewVersion, + boolean runTestQueries, + boolean waitForSegmentsToLoad, + Pair segmentAvailabilityConfirmationPair + ) throws IOException + { + doIndexTest( + dataSource, + indexTaskFilePath, + taskSpecTransform, + queryFilePath, + waitForNewVersion, + runTestQueries, + false, waitForSegmentsToLoad, segmentAvailabilityConfirmationPair ); @@ -267,6 +301,7 @@ protected void doIndexTest( String queryFilePath, boolean waitForNewVersion, boolean runTestQueries, + boolean isSqlQueries, boolean waitForSegmentsToLoad, Pair segmentAvailabilityConfirmationPair ) throws IOException @@ -288,7 +323,7 @@ protected void doIndexTest( segmentAvailabilityConfirmationPair ); if (runTestQueries) { - doTestQuery(dataSource, queryFilePath); + doTestQuery(dataSource, queryFilePath, isSqlQueries); } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java index ce8a9f5c13cd..868ffdb8e530 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractLocalInputSourceParallelIndexTest.java @@ -26,6 +26,7 @@ import javax.annotation.Nonnull; import java.io.Closeable; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.function.Function; @@ -48,6 +49,28 @@ public void doIndexTest( @Nonnull Map extraInputFormatMap, Pair segmentAvailabilityConfirmationPair ) throws Exception + { + doIndexTest( + inputFormatDetails, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + false, + Collections.emptyMap(), + extraInputFormatMap, + segmentAvailabilityConfirmationPair + ); + } + + + public void doIndexTest( + InputFormatDetails inputFormatDetails, + String ingestSpecTemplate, + String queries, + boolean useSqlQueries, + @Nonnull Map templateValues, + @Nonnull Map extraInputFormatMap, + Pair segmentAvailabilityConfirmationPair + ) throws Exception { final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); Map inputFormatMap = new ImmutableMap.Builder().putAll(extraInputFormatMap) @@ -58,6 +81,13 @@ public void doIndexTest( ) { final Function sqlInputSourcePropsTransform = spec -> { try { + for (Map.Entry entry : templateValues.entrySet()) { + spec = StringUtils.replace( + spec, + "%%" + entry.getKey() + "%%", + jsonMapper.writeValueAsString(entry.getValue()) + ); + } spec = StringUtils.replace( spec, "%%PARTITIONS_SPEC%%", @@ -102,11 +132,12 @@ public void doIndexTest( doIndexTest( indexDatasource, - INDEX_TASK, + ingestSpecTemplate, sqlInputSourcePropsTransform, - INDEX_QUERIES_RESOURCE, + queries, false, true, + useSqlQueries, true, segmentAvailabilityConfirmationPair ); diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java new file mode 100644 index 000000000000..61deca75ed88 --- /dev/null +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllFormatSchemalessTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.InputFormat; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(DruidTestRunner.class) +@Category(InputFormat.class) +public class ITLocalInputSourceAllFormatSchemalessTest extends AbstractLocalInputSourceParallelIndexTest +{ + private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task_schemaless.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_schemaless_queries.json"; + + @Test + public void testAvroInputFormatIndexDataIngestionSpecWithFileSchemaSchemaless() throws Exception + { + List fieldList = ImmutableList.of( + ImmutableMap.of("name", "timestamp", "type", "string"), + ImmutableMap.of("name", "page", "type", "string"), + ImmutableMap.of("name", "language", "type", "string"), + ImmutableMap.of("name", "user", "type", "string"), + ImmutableMap.of("name", "unpatrolled", "type", "string"), + ImmutableMap.of("name", "newPage", "type", "string"), + ImmutableMap.of("name", "robot", "type", "string"), + ImmutableMap.of("name", "anonymous", "type", "string"), + ImmutableMap.of("name", "namespace", "type", "string"), + ImmutableMap.of("name", "continent", "type", "string"), + ImmutableMap.of("name", "country", "type", "string"), + ImmutableMap.of("name", "region", "type", "string"), + ImmutableMap.of("name", "city", "type", "string"), + ImmutableMap.of("name", "added", "type", "int"), + ImmutableMap.of("name", "deleted", "type", "int"), + ImmutableMap.of("name", "delta", "type", "int") + ); + Map schema = ImmutableMap.of("namespace", "org.apache.druid.data.input", + "type", "record", + "name", "wikipedia", + "fields", fieldList); + doIndexTest( + AbstractITBatchIndexTest.InputFormatDetails.AVRO, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + true, + ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true), + ImmutableMap.of("schema", schema), + new Pair<>(false, false) + ); + } + + @Test + public void testAvroInputFormatIndexDataIngestionSpecNoFileSchemaSchemaless() throws Exception + { + doIndexTest( + AbstractITBatchIndexTest.InputFormatDetails.AVRO, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + true, + ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true), + Collections.emptyMap(), + new Pair<>(false, false) + ); + } + + @Test + public void testJsonInputFormatIndexDataIngestionSpecSchemaless() throws Exception + { + doIndexTest( + AbstractITBatchIndexTest.InputFormatDetails.JSON, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + true, + ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true), + Collections.emptyMap(), + new Pair<>(false, false) + ); + } + + @Test + public void testParquetInputFormatIndexDataIngestionSpecSchemaless() throws Exception + { + doIndexTest( + AbstractITBatchIndexTest.InputFormatDetails.PARQUET, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + true, + ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true), + Collections.emptyMap(), + new Pair<>(false, false) + ); + } + + @Test + public void testOrcInputFormatIndexDataIngestionSpecSchemaless() throws Exception + { + doIndexTest( + AbstractITBatchIndexTest.InputFormatDetails.ORC, + INDEX_TASK, + INDEX_QUERIES_RESOURCE, + true, + ImmutableMap.of("USE_NESTED_COLUMN_INDEXER", true), + Collections.emptyMap(), + new Pair<>(false, false) + ); + } +} diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java index 8482952db243..641d385865fa 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITLocalInputSourceAllInputFormatTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.tests.indexer.AbstractLocalInputSourceParallelIndexTest; import org.apache.druid.testsEx.categories.InputFormat; import org.apache.druid.testsEx.config.DruidTestRunner; import org.junit.Test; diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java index 73fc73d42d87..4f19a226d9b8 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITOverwriteBatchIndexTest.java @@ -159,6 +159,7 @@ private void submitIngestionTaskAndVerify( null, false, false, + false, true, new Pair<>(false, false) ); diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java index 735bc0a50ff7..60d22f3f9f8f 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/leadership/ITHighAvailabilityTest.java @@ -32,11 +32,11 @@ import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; import org.apache.druid.testing.utils.SqlTestQueryHelper; -import org.apache.druid.tests.indexer.AbstractIndexerTest; import org.apache.druid.testsEx.categories.HighAvailability; import org.apache.druid.testsEx.cluster.DruidClusterClient; import org.apache.druid.testsEx.config.DruidTestRunner; import org.apache.druid.testsEx.config.Initializer; +import org.apache.druid.testsEx.indexer.AbstractIndexerTest; import org.apache.druid.testsEx.utils.DruidClusterAdminClient; import org.junit.Test; import org.junit.experimental.categories.Category; diff --git a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json index 928effe65e97..62c3ac637159 100644 --- a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json +++ b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_queries.json @@ -1,6 +1,6 @@ [ { - "description": "timeseries, 1 agg, all", + "description": "timeboundary", "query":{ "queryType" : "timeBoundary", "dataSource": "%%DATASOURCE%%" diff --git a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json new file mode 100644 index 000000000000..316ec5fd8ae4 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_index_schemaless_queries.json @@ -0,0 +1,32 @@ +[ + { + "description": "select all things", + "query": { + "query": "SELECT \"__time\",\"continent\",\"country\",\"city\",\"added\",\"unpatrolled\",\"delta\",\"language\",\"robot\",\"deleted\",\"newPage\",\"namespace\",\"anonymous\",\"page\",\"region\",\"user\" FROM \"%%DATASOURCE%%\" ORDER BY __time" + }, + "expectedResults": [ + {"__time":"2013-08-31T01:02:33.000Z","continent":"North America","country":"United States","city":"San Francisco","added":57,"unpatrolled":"true","delta":-143,"language":"en","robot":"false","deleted":200,"newPage":"true","namespace":"article","anonymous":"false","page":"Gypsy Danger","region":"Bay Area","user":"nuclear"}, + {"__time":"2013-08-31T03:32:45.000Z","continent":"Australia","country":"Australia","city":"Syndey","added":459,"unpatrolled":"false","delta":330,"language":"en","robot":"true","deleted":129,"newPage":"true","namespace":"wikipedia","anonymous":"false","page":"Striker Eureka","region":"Cantebury","user":"speed"}, + {"__time":"2013-08-31T07:11:21.000Z","continent":"Asia","country":"Russia","city":"Moscow","added":123,"unpatrolled":"false","delta":111,"language":"ru","robot":"true","deleted":12,"newPage":"true","namespace":"article","anonymous":"false","page":"Cherno Alpha","region":"Oblast","user":"masterYi"}, + {"__time":"2013-08-31T11:58:39.000Z","continent":"Asia","country":"China","city":"Taiyuan","added":905,"unpatrolled":"true","delta":900,"language":"zh","robot":"true","deleted":5,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Crimson Typhoon","region":"Shanxi","user":"triplets"}, + {"__time":"2013-08-31T12:41:27.000Z","continent":"Asia","country":"Japan","city":"Tokyo","added":1,"unpatrolled":"true","delta":-9,"language":"ja","robot":"true","deleted":10,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Coyote Tango","region":"Kanto","user":"stringer"}, + {"__time":"2013-09-01T01:02:33.000Z","continent":"North America","country":"United States","city":"San Francisco","added":57,"unpatrolled":"true","delta":-143,"language":"en","robot":"false","deleted":200,"newPage":"true","namespace":"article","anonymous":"false","page":"Gypsy Danger","region":"Bay Area","user":"nuclear"}, + {"__time":"2013-09-01T03:32:45.000Z","continent":"Australia","country":"Australia","city":"Syndey","added":459,"unpatrolled":"false","delta":330,"language":"en","robot":"true","deleted":129,"newPage":"true","namespace":"wikipedia","anonymous":"false","page":"Striker Eureka","region":"Cantebury","user":"speed"}, + {"__time":"2013-09-01T07:11:21.000Z","continent":"Asia","country":"Russia","city":"Moscow","added":123,"unpatrolled":"false","delta":111,"language":"ru","robot":"true","deleted":12,"newPage":"true","namespace":"article","anonymous":"false","page":"Cherno Alpha","region":"Oblast","user":"masterYi"}, + {"__time":"2013-09-01T11:58:39.000Z","continent":"Asia","country":"China","city":"Taiyuan","added":905,"unpatrolled":"true","delta":900,"language":"zh","robot":"true","deleted":5,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Crimson Typhoon","region":"Shanxi","user":"triplets"}, + {"__time":"2013-09-01T12:41:27.000Z","continent":"Asia","country":"Japan","city":"Tokyo","added":1,"unpatrolled":"true","delta":-9,"language":"ja","robot":"true","deleted":10,"newPage":"false","namespace":"wikipedia","anonymous":"false","page":"Coyote Tango","region":"Kanto","user":"stringer"} + ] + }, + { + "description": "simple group by", + "query": { + "query": "SELECT page, SUM(added) as added FROM \"%%DATASOURCE%%\" WHERE continent = 'Asia' GROUP BY 1 ORDER BY 2 DESC" + }, + "expectedResults": [ + {"page":"Crimson Typhoon","added":1810}, + {"page":"Cherno Alpha","added":246}, + {"page":"Coyote Tango","added":2} + ] + } + +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json new file mode 100644 index 000000000000..bc96c730ee77 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json @@ -0,0 +1,42 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [], + "useNestedColumnIndexerForSchemaDiscovery": %%USE_NESTED_COLUMN_INDEXER%% + }, + "metricsSpec": [], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals" : [ "2013-08-31/2013-09-02" ] + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "local", + "filter" : "%%INPUT_SOURCE_FILTER%%", + "baseDir": "%%INPUT_SOURCE_BASE_DIR%%" + }, + "appendToExisting": %%APPEND_TO_EXISTING%%, + "dropExisting": %%DROP_EXISTING%%, + "inputFormat": %%INPUT_FORMAT%% + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumConcurrentSubTasks": 4, + "splitHintSpec": { + "type": "maxSize", + "maxNumFiles": 1 + }, + "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%, + "partitionsSpec": %%PARTITIONS_SPEC%% + } + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java index 887c931204e8..1269fe1e6b3c 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java @@ -42,8 +42,6 @@ public abstract class AppendableIndexBuilder protected boolean preserveExistingMetrics = false; protected boolean useMaxMemoryEstimates = true; - protected boolean useNestedColumnIndexerForSchemaDiscovery = false; - protected final Logger log = new Logger(this.getClass()); public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) @@ -132,14 +130,6 @@ public AppendableIndexBuilder setUseMaxMemoryEstimates(final boolean useMaxMemor return this; } - public AppendableIndexBuilder setUseNestedColumnIndexerForSchemaDiscovery( - boolean useNestedColumnIndexerForSchemaDiscovery - ) - { - this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; - return this; - } - public void validate() { if (maxRowCount <= 0) { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java index 3af6bfa4cbe8..67cdabdf5673 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java @@ -32,7 +32,4 @@ public interface AppendableIndexSpec // Returns the default max bytes in memory for this index. long getDefaultMaxBytesInMemory(); - - @SuppressWarnings("unused") - boolean useNestedColumnIndexerForSchemaDiscovery(); } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 0085b58859e2..ddb52c309cb8 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -276,8 +276,7 @@ protected IncrementalIndex( final boolean deserializeComplexMetrics, final boolean concurrentEventAdd, final boolean preserveExistingMetrics, - final boolean useMaxMemoryEstimates, - final boolean useNestedColumnIndexerForSchemaDiscovery + final boolean useMaxMemoryEstimates ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -289,7 +288,8 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.preserveExistingMetrics = preserveExistingMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; - this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; + this.useNestedColumnIndexerForSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() + .useNestedColumnIndexerForSchemaDiscovery(); this.timeAndMetricsColumnCapabilities = new HashMap<>(); this.metricDescs = Maps.newLinkedHashMap(); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 1912136a0734..35ccd8961da5 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -126,11 +126,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics // This is currently only use by auto compaction and should not be use for anything else. boolean preserveExistingMetrics, - boolean useMaxMemoryEstimates, - boolean useNestedColumnIndexerSchemaDiscovery + boolean useMaxMemoryEstimates ) { - super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates, useNestedColumnIndexerSchemaDiscovery); + super( + incrementalIndexSchema, + deserializeComplexMetrics, + concurrentEventAdd, + preserveExistingMetrics, + useMaxMemoryEstimates + ); this.maxRowCount = maxRowCount; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) @@ -657,8 +662,7 @@ protected OnheapIncrementalIndex buildInner() maxRowCount, maxBytesInMemory, preserveExistingMetrics, - useMaxMemoryEstimates, - useNestedColumnIndexerForSchemaDiscovery + useMaxMemoryEstimates ); } } @@ -666,7 +670,6 @@ protected OnheapIncrementalIndex buildInner() public static class Spec implements AppendableIndexSpec { private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false; - private static final boolean DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY = false; public static final String TYPE = "onheap"; // When set to true, for any row that already has metric (with the same name defined in metricSpec), @@ -676,26 +679,19 @@ public static class Spec implements AppendableIndexSpec // This is currently only use by auto compaction and should not be use for anything else. final boolean preserveExistingMetrics; - final boolean useNestedColumnIndexerForSchemaDiscovery; - public Spec() { this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS; - this.useNestedColumnIndexerForSchemaDiscovery = DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY; } @JsonCreator public Spec( - final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics, - final @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") @Nullable Boolean useNestedColumnIndexerForSchemaDiscovery + final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics ) { this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : DEFAULT_PRESERVE_EXISTING_METRICS; - this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery != null - ? useNestedColumnIndexerForSchemaDiscovery - : DEFAULT_USE_NESTED_COLUMN_INDEXER_SCHEMA_DISCOVERY; } @JsonProperty @@ -707,8 +703,7 @@ public boolean isPreserveExistingMetrics() @Override public AppendableIndexBuilder builder() { - return new Builder().setPreserveExistingMetrics(preserveExistingMetrics) - .setUseNestedColumnIndexerForSchemaDiscovery(useNestedColumnIndexerForSchemaDiscovery); + return new Builder().setPreserveExistingMetrics(preserveExistingMetrics); } @Override @@ -720,13 +715,6 @@ public long getDefaultMaxBytesInMemory() return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6; } - @JsonProperty - @Override - public boolean useNestedColumnIndexerForSchemaDiscovery() - { - return useNestedColumnIndexerForSchemaDiscovery; - } - @Override public boolean equals(Object o) { @@ -737,14 +725,13 @@ public boolean equals(Object o) return false; } Spec spec = (Spec) o; - return preserveExistingMetrics == spec.preserveExistingMetrics && - useNestedColumnIndexerForSchemaDiscovery == spec.useNestedColumnIndexerForSchemaDiscovery; + return preserveExistingMetrics == spec.preserveExistingMetrics; } @Override public int hashCode() { - return Objects.hash(preserveExistingMetrics, useNestedColumnIndexerForSchemaDiscovery); + return Objects.hash(preserveExistingMetrics); } } } diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java index 54bfe1dcb59f..e5a1c41bc422 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java @@ -46,7 +46,6 @@ import org.junit.Test; import javax.annotation.Nonnull; -import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -548,13 +547,12 @@ private static IncrementalIndex makeIncrementalIndex(long minTimestamp) new TimestampSpec(TIME_COL, "millis", null), Granularities.NONE, VirtualColumns.EMPTY, - new DimensionsSpec(Collections.emptyList()), + DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), new AggregatorFactory[0], false ) ) .setMaxRowCount(1000) - .setUseNestedColumnIndexerForSchemaDiscovery(true) .build(); return index; } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index dad2a4c2134d..02d43e275061 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -125,8 +125,7 @@ public MapIncrementalIndex( maxRowCount, maxBytesInMemory, false, - true, - false + true ); } @@ -150,8 +149,7 @@ public MapIncrementalIndex( maxRowCount, maxBytesInMemory, false, - true, - false + true ); } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java new file mode 100644 index 000000000000..07ed6f7502fa --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +public class OnheapIncrementalIndexTest +{ + private static final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); + + @Test + public void testSerde() throws JsonProcessingException + { + OnheapIncrementalIndex.Spec spec = new OnheapIncrementalIndex.Spec(true); + Assert.assertEquals(spec, MAPPER.readValue(MAPPER.writeValueAsString(spec), OnheapIncrementalIndex.Spec.class)); + } + @Test + public void testSpecEqualsAndHashCode() + { + EqualsVerifier.forClass(OnheapIncrementalIndex.Spec.class) + .usingGetClass() + .verify(); + } +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java index ad8827f2a95b..7b1a7c54682b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java @@ -88,7 +88,7 @@ public static ClientCompactionTaskQueryTuningConfig from( if (userCompactionTaskQueryTuningConfig == null) { return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, - new OnheapIncrementalIndex.Spec(preserveExistingMetrics, false), + new OnheapIncrementalIndex.Spec(preserveExistingMetrics), null, null, null, @@ -111,7 +111,7 @@ public static ClientCompactionTaskQueryTuningConfig from( } else { AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() - : new OnheapIncrementalIndex.Spec(preserveExistingMetrics, false); + : new OnheapIncrementalIndex.Spec(preserveExistingMetrics); return new ClientCompactionTaskQueryTuningConfig( maxRowsPerSegment, appendableIndexSpecToUse, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 4a48f3280c7c..a0f4cf9a6101 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -259,7 +259,7 @@ public void testSerdeUserCompactionTuningConfigWithAppendableIndexSpec() throws { final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( 40000, - new OnheapIncrementalIndex.Spec(true, false), + new OnheapIncrementalIndex.Spec(true), 2000L, null, new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java index 947328722964..01c889ad2c3c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java @@ -78,7 +78,7 @@ public void testSerde() throws IOException { final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig( 40000, - new OnheapIncrementalIndex.Spec(true, false), + new OnheapIncrementalIndex.Spec(true), 2000L, null, new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index b1f3ad99527c..73abcb8d5eac 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -1523,7 +1523,7 @@ public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() null, new UserCompactionTaskQueryTuningConfig( null, - new OnheapIncrementalIndex.Spec(true, false), + new OnheapIncrementalIndex.Spec(true), null, 1000L, null, @@ -1558,7 +1558,7 @@ public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec() null, new UserCompactionTaskQueryTuningConfig( null, - new OnheapIncrementalIndex.Spec(false, false), + new OnheapIncrementalIndex.Spec(false), null, 1000L, null,