diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/FlattenJSONBenchmarkUtil.java b/benchmarks/src/main/java/org/apache/druid/benchmark/FlattenJSONBenchmarkUtil.java index 6c0ec31d64a6..557dca72effc 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/FlattenJSONBenchmarkUtil.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/FlattenJSONBenchmarkUtil.java @@ -57,6 +57,7 @@ public Parser getFlatParser() new TimestampSpec("ts", "iso", null), new DimensionsSpec(null, null, null), null, + null, null ); return spec.makeParser(); @@ -71,6 +72,7 @@ public Parser getFieldDiscoveryParser() new TimestampSpec("ts", "iso", null), new DimensionsSpec(null, null, null), flattenSpec, + null, null ); @@ -114,6 +116,7 @@ public Parser getNestedParser() new TimestampSpec("ts", "iso", null), new DimensionsSpec(null, null, null), flattenSpec, + null, null ); @@ -157,6 +160,7 @@ public Parser getForcedPathParser() new TimestampSpec("ts", "iso", null), new DimensionsSpec(null, null, null), flattenSpec, + null, null ); @@ -198,6 +202,7 @@ public Parser getJqParser() new TimestampSpec("ts", "iso", null), new DimensionsSpec(null, null, null), flattenSpec, + null, null ); diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java index 9c167b7bbc62..f805921cc824 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/FileIteratingFirehose.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.NoSuchElementException; @@ -36,7 +37,8 @@ public class FileIteratingFirehose implements Firehose { private final Iterator lineIterators; private final StringInputRowParser parser; - + private Iterator parsedInputRows = new ArrayList().iterator(); + private String raw = null; private LineIterator lineIterator = null; private final Closeable closer; @@ -63,11 +65,12 @@ public FileIteratingFirehose( @Override public boolean hasMore() throws IOException { + while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { lineIterator = getNextLineIterator(); } - return lineIterator != null && lineIterator.hasNext(); + return (lineIterator != null && lineIterator.hasNext()) || parsedInputRows.hasNext(); } @Nullable @@ -77,8 +80,17 @@ public InputRow nextRow() throws IOException if (!hasMore()) { throw new NoSuchElementException(); } + if (parsedInputRows.hasNext()) { + return parsedInputRows.next(); + } + return getNextRow(); + } - return parser.parse(lineIterator.next()); + private InputRow getNextRow() + { + raw = lineIterator.next(); + parsedInputRows = parser.parseBatch(raw).iterator(); + return parsedInputRows.hasNext() ? parsedInputRows.next() : null; } @Override @@ -87,10 +99,8 @@ public InputRowPlusRaw nextRowWithRaw() throws IOException if (!hasMore()) { throw new NoSuchElementException(); } - - String raw = lineIterator.next(); try { - return InputRowPlusRaw.of(parser.parse(raw), StringUtils.toUtf8(raw)); + return InputRowPlusRaw.of(nextRow(), StringUtils.toUtf8(raw)); } catch (ParseException e) { return InputRowPlusRaw.of(StringUtils.toUtf8(raw), e); diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java index bf33d5136ec2..79b32125ca5b 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JSONParseSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONExplodeSpec; import org.apache.druid.java.util.common.parsers.JSONPathParser; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.Parser; @@ -33,17 +34,20 @@ import java.util.Objects; /** + * */ public class JSONParseSpec extends NestedDataParseSpec { private final ObjectMapper objectMapper; private final Map featureSpec; + private final List explodeSpec; @JsonCreator public JSONParseSpec( @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("flattenSpec") JSONPathSpec flattenSpec, + @JsonProperty("explodeSpec") List explodeSpec, @JsonProperty("featureSpec") Map featureSpec ) { @@ -54,12 +58,19 @@ public JSONParseSpec( Feature feature = Feature.valueOf(entry.getKey()); objectMapper.configure(feature, entry.getValue()); } + this.explodeSpec = explodeSpec; + } + + @JsonProperty + public List getExplodeSpec() + { + return explodeSpec; } @Deprecated public JSONParseSpec(TimestampSpec ts, DimensionsSpec dims) { - this(ts, dims, null, null); + this(ts, dims, null, null, null); } @Override @@ -70,19 +81,19 @@ public void verify(List usedCols) @Override public Parser makeParser() { - return new JSONPathParser(getFlattenSpec(), objectMapper); + return new JSONPathParser(getFlattenSpec(), getExplodeSpec(), objectMapper); } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new JSONParseSpec(spec, getDimensionsSpec(), getFlattenSpec(), getFeatureSpec()); + return new JSONParseSpec(spec, getDimensionsSpec(), getFlattenSpec(), getExplodeSpec(), getFeatureSpec()); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new JSONParseSpec(getTimestampSpec(), spec, getFlattenSpec(), getFeatureSpec()); + return new JSONParseSpec(getTimestampSpec(), spec, getFlattenSpec(), getExplodeSpec(), getFeatureSpec()); } @JsonProperty @@ -120,6 +131,7 @@ public String toString() "timestampSpec=" + getTimestampSpec() + ", dimensionsSpec=" + getDimensionsSpec() + ", flattenSpec=" + getFlattenSpec() + + ", explodeSpec=" + getExplodeSpec() + ", featureSpec=" + featureSpec + '}'; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java index b872219e7c8f..69201db02251 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/ParseSpec.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.parsers.Parser; import java.util.List; +import java.util.Objects; @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format") @@ -98,12 +99,10 @@ public boolean equals(Object o) ParseSpec parseSpec = (ParseSpec) o; - if (timestampSpec != null ? !timestampSpec.equals(parseSpec.timestampSpec) : parseSpec.timestampSpec != null) { + if (!Objects.equals(timestampSpec, parseSpec.timestampSpec)) { return false; } - return !(dimensionsSpec != null - ? !dimensionsSpec.equals(parseSpec.dimensionsSpec) - : parseSpec.dimensionsSpec != null); + return !(!Objects.equals(dimensionsSpec, parseSpec.dimensionsSpec)); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/StringInputRowParser.java b/core/src/main/java/org/apache/druid/data/input/impl/StringInputRowParser.java index 07761c0898db..2f50ff375447 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/StringInputRowParser.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/StringInputRowParser.java @@ -25,7 +25,6 @@ import com.google.common.collect.Iterators; import org.apache.druid.data.input.ByteBufferInputRowParser; import org.apache.druid.data.input.InputRow; -import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.Parser; @@ -36,10 +35,12 @@ import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.Map; /** + * */ public class StringInputRowParser implements ByteBufferInputRowParser { @@ -60,7 +61,6 @@ public StringInputRowParser( { this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.mapParser = new MapInputRowParser(parseSpec); - if (encoding != null) { this.charset = Charset.forName(encoding); } else { @@ -77,7 +77,9 @@ public StringInputRowParser(ParseSpec parseSpec) @Override public List parseBatch(ByteBuffer input) { - return Utils.nullableListOf(parseMap(buildStringKeyMap(input))); + final List theList = new ArrayList<>(); + buildStringKeyMapList(input).forEach(e -> theList.add(parseMap(e))); + return theList; } @JsonProperty @@ -127,6 +129,34 @@ private Map buildStringKeyMap(ByteBuffer input) return theMap; } + private List> buildStringKeyMapList(ByteBuffer input) + { + int payloadSize = input.remaining(); + if (chars == null || chars.remaining() < payloadSize) { + chars = CharBuffer.allocate(payloadSize); + } + + final CoderResult coderResult = charset.newDecoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE) + .decode(input, chars, true); + + List> theMapList; + if (coderResult.isUnderflow()) { + chars.flip(); + try { + initializeParser(); + theMapList = parser.parseToMapList(chars.toString()); + } + finally { + chars.clear(); + } + } else { + throw new ParseException("Failed with CoderResult[%s]", coderResult); + } + return theMapList; + } + public void initializeParser() { if (parser == null) { @@ -142,6 +172,15 @@ public void startFileFromBeginning() parser.startFileFromBeginning(); } + + public List parseBatch(@Nullable String input) + { + initializeParser(); + List returnList = new ArrayList<>(); + parser.parseToMapList(input).forEach(e -> returnList.add(parseMap(e))); + return returnList; + } + @Nullable public InputRow parse(@Nullable String input) { diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java index 9039106ff9d6..0de0d111f450 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java @@ -126,6 +126,12 @@ public void setFieldNames(final String header) } } + @Override + public List> parseToMapList(final String input) + { + return Utils.nullableListOf(parseToMap(input)); + } + @Override public Map parseToMap(final String input) { diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONExplodeSpec.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONExplodeSpec.java new file mode 100644 index 000000000000..7a709fa27596 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONExplodeSpec.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.parsers; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class JSONExplodeSpec +{ + public static final JSONExplodeSpec DEFAULT = + new JSONExplodeSpec(null, null); + + private final String path; + private final String type; + + @JsonCreator + public JSONExplodeSpec( + @JsonProperty("path") String path, + @JsonProperty("type") String type + ) + { + this.path = path; + this.type = type; + } + + @JsonProperty + public String getPath() + { + return path; + } + + @JsonProperty + public String getType() + { + return type; + } + + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final JSONExplodeSpec that = (JSONExplodeSpec) o; + return path.equals(that.path) && + type.equals(that.type); + } + + @Override + public int hashCode() + { + return Objects.hash(path, type); + } + + @Override + public String toString() + { + return "JSONExplodeSpec{" + + "path=" + path + + ", type=" + type + + '}'; + } +} + diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONExploderMaker.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONExploderMaker.java new file mode 100644 index 000000000000..2fc88ff58e69 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONExploderMaker.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.parsers; + +import com.fasterxml.jackson.databind.JsonNode; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; +import org.apache.druid.data.input.impl.FastJacksonJsonNodeJsonProvider; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class JSONExploderMaker implements ObjectExploders.ExploderMaker +{ + private static final Configuration JSONPATH_CONFIGURATION = + Configuration.builder() + .jsonProvider(new FastJacksonJsonNodeJsonProvider()) + .mappingProvider(new JacksonMappingProvider()) + .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) + .build(); + + private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder(); + + + @Override + public List getExplodeArray(final JsonNode obj, String expr) + { + final JsonPath jsonPath = JsonPath.compile(expr); + JsonNode arrayToExplode = jsonPath.read(obj, JSONPATH_CONFIGURATION); + List newList = new ArrayList<>(); + for (JsonNode entry : arrayToExplode) { + if (!entry.isNull()) { + newList.add(entry); + } + } + return newList; + } + + @Override + public JsonNode setNodeWithValue(final JsonNode node, Object value, String expr) + { + final JsonPath jsonPath = JsonPath.compile(expr); + JsonNode replica = node.deepCopy(); + JsonNode retVal = jsonPath.set(replica, value, JSONPATH_CONFIGURATION); + return retVal; + } + + @Override + public Object valueConversionFunction(JsonNode val) + { + if (val == null || val.isNull()) { + return null; + } + + if (val.isBoolean()) { + return val.asBoolean(); + } + + if (val.isInt() || val.isLong()) { + return val.asLong(); + } + + if (val.isNumber()) { + return val.asDouble(); + } + + if (val.isTextual()) { + return charsetFix(val.asText()); + } + + if (val.isArray()) { + List newList = new ArrayList<>(); + for (JsonNode entry : val) { + if (!entry.isNull()) { + newList.add(valueConversionFunction(entry)); + } + } + return newList; + } + + if (val.isObject()) { + Map newMap = new LinkedHashMap<>(); + for (Iterator> it = val.fields(); it.hasNext(); ) { + Map.Entry entry = it.next(); + newMap.put(entry.getKey(), valueConversionFunction(entry.getValue())); + } + return newMap; + } + + return val; + } + + @Nullable + private String charsetFix(String s) + { + if (s != null && !enc.canEncode(s)) { + // Some whacky characters are in this string (e.g. \uD900). These are problematic because they are decodeable + // by new String(...) but will not encode into the same character. This dance here will replace these + // characters with something more sane. + return StringUtils.fromUtf8(StringUtils.toUtf8(s)); + } else { + return s; + } + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java index 8d58451f34bb..36ee6f4c5cf6 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java @@ -108,6 +108,10 @@ private Object valueConversionFunction(JsonNode val) return val.asLong(); } + if (val.isBoolean()) { + return val.asBoolean(); + } + if (val.isNumber()) { return val.asDouble(); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java index 1adfda7ed4a2..75e8eb04e745 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONPathParser.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -32,6 +34,8 @@ public class JSONPathParser implements Parser { private final ObjectMapper mapper; private final ObjectFlattener flattener; + private final ObjectExploder exploder; + private final List explodeSpec; /** * Constructor @@ -39,10 +43,12 @@ public class JSONPathParser implements Parser * @param flattenSpec Provide a path spec for flattening and field discovery. * @param mapper Optionally provide an ObjectMapper, used by the parser for reading the input JSON. */ - public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper) + public JSONPathParser(JSONPathSpec flattenSpec, List explodeSpec, ObjectMapper mapper) { this.mapper = mapper == null ? new ObjectMapper() : mapper; this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker()); + this.exploder = ObjectExploders.create(explodeSpec, new JSONExploderMaker()); + this.explodeSpec = explodeSpec; } @Override @@ -60,7 +66,6 @@ public void setFieldNames(Iterable fieldNames) * @param input JSON string. The root must be a JSON object, not an array. * e.g., {"valid": "true"} and {"valid":[1,2,3]} are supported * but [{"invalid": "true"}] and [1,2,3] are not. - * * @return A map of field names and values */ @Override @@ -74,4 +79,25 @@ public Map parseToMap(String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } + + @Override + public List> parseToMapList(String input) + { + try { + JsonNode document = mapper.readValue(input, JsonNode.class); + if (explodeSpec != null) { + List explodedNodes = exploder.explode(ImmutableList.of(document)); + List> flattenedMaps = new ArrayList<>(); + for (JsonNode explodedNode : explodedNodes) { + flattenedMaps.add(flattener.flatten(explodedNode)); + } + return flattenedMaps; + } else { + return ImmutableList.of(flattener.flatten(document)); + } + } + catch (Exception e) { + throw new ParseException(e, "Unable to parse row [%s]", input); + } + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectExploder.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectExploder.java new file mode 100644 index 000000000000..20bf7bbbaab7 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectExploder.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.parsers; + +import java.util.List; + +public interface ObjectExploder +{ + List explode(List obj); +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectExploders.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectExploders.java new file mode 100644 index 000000000000..c3cd53617963 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectExploders.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.parsers; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.ArrayList; +import java.util.List; + +public class ObjectExploders +{ + private ObjectExploders() + { + // No instantiation. + } + + public static ObjectExploder create( + final List explodeSpecList, + final ExploderMaker exploderMaker + ) + { + + return new ObjectExploder() + { + + @Override + public List explode(final List obj) + { + List allObjs = obj; + for (JSONExplodeSpec explodeSpec : explodeSpecList) { + List newAllObjs = new ArrayList<>(); + final String path = explodeSpec.getPath(); + for (T curObj : allObjs) { + List arrayToExplode = exploderMaker.getExplodeArray(curObj, path); + for (Object entry : arrayToExplode) { + T newObj = exploderMaker.setNodeWithValue(curObj, entry, path); + newAllObjs.add(newObj); + } + } + allObjs = newAllObjs; + } + List retval = new ArrayList<>(); + for (T curObj : allObjs) { + retval.add(curObj); + } + return retval; + } + }; + } + + public interface ExploderMaker + { + List getExplodeArray(T node, String expr); + + T setNodeWithValue(T node, Object value, String expr); + + Object valueConversionFunction(JsonNode val); + + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/Parser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/Parser.java index 62c4e496153d..0188628191f7 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/Parser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/Parser.java @@ -19,6 +19,8 @@ package org.apache.druid.java.util.common.parsers; +import org.apache.druid.java.util.common.collect.Utils; + import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -45,6 +47,18 @@ default void startFileFromBeginning() @Nullable Map parseToMap(String input); + + /** + * Parse a String into a List of Maps. The result can be null which means the given input string will be ignored. + * + * @throws ParseException if the String cannot be parsed + */ + @Nullable + default List> parseToMapList(String input) + { + return Utils.nullableListOf(); + } + /** * Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be * expected to know what fields they will return. Some individual types of parsers do need to know (like a TSV diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/RegexParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/RegexParser.java index fa38b0e954d5..756d1323e2c2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/RegexParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/RegexParser.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.collect.Utils; @@ -110,6 +111,12 @@ public Map parseToMap(String input) } } + @Override + public List> parseToMapList(String input) + { + return ImmutableList.of(parseToMap(input)); + } + @Override public void setFieldNames(Iterable fieldNames) { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/InputRowParserSerdeTest.java b/core/src/test/java/org/apache/druid/data/input/impl/InputRowParserSerdeTest.java index e79953ce4a6a..bd46eb275b87 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/InputRowParserSerdeTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/InputRowParserSerdeTest.java @@ -52,6 +52,7 @@ public void testStringInputRowParserSerde() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo", "bar")), null, null), null, + null, null ), null @@ -98,6 +99,7 @@ public void testMapInputRowParserSerde() throws Exception null ), null, + null, null ) ); @@ -131,6 +133,7 @@ public void testMapInputRowParserNumbersSerde() throws Exception null ), null, + null, null ) ); @@ -168,6 +171,7 @@ private InputRow testCharsetParseHelper(Charset charset) throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo", "bar")), null, null), null, + null, null ), charset.name() @@ -208,6 +212,7 @@ public void testFlattenParse() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(null, null, null), flattenSpec, + null, null ), null diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JSONParseSpecTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JSONParseSpecTest.java index 4dd13b601a69..800be14d5936 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JSONParseSpecTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JSONParseSpecTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import org.apache.druid.TestObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONExplodeSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -34,6 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class JSONParseSpecTest @@ -57,6 +59,7 @@ public void testParseRow() new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), + null, null ); @@ -95,6 +98,7 @@ public void testParseRowWithConditional() new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar", "$.[?(@.something_else)].something_else.foo") ) ), + null, null ); @@ -119,6 +123,7 @@ public void testSerde() throws IOException new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), null, + null, feature ); @@ -132,4 +137,269 @@ public void testSerde() throws IOException Assert.assertEquals(Arrays.asList("bar", "foo"), serde.getDimensionsSpec().getDimensionNames()); Assert.assertEquals(feature, serde.getFeatureSpec()); } + + + @Test + public void testParseNoExplode() + { + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), + null, + null, + null + ); + + final String input = "{\"time\":\"2014-01-01T00:00:10Z\",\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}\n"; + final Map explodeMap1 = new HashMap<>(); + + explodeMap1.put("time", "2014-01-01T00:00:10Z"); + explodeMap1.put("dim", "a"); + explodeMap1.put("dimLong", (long) 2); + explodeMap1.put("dimFloat", 3.0); + explodeMap1.put("val", (long) 1); + + + final Parser parser = parseSpec.makeParser(); + final List> parsedRows = parser.parseToMapList(input); + + Assert.assertNotNull(parsedRows); + Assert.assertTrue(explodeMap1.equals(parsedRows.get(0))); + final Map parsedRow1 = parsedRows.get(0); + + Assert.assertNull(parsedRow1.get("jq_omg2")); + Assert.assertNull(parsedRow1.get("path_omg2")); + } + + @Test + public void testParseSimpleExplodeNoFlatten() + { + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), + null, + ImmutableList.of( + new JSONExplodeSpec("$.bidders", null) + ), + null + ); + + final String input = "{\"bidders\":[\"Baron\",\"Caro\",\"Daro\"]}"; + final Map explodeMap1 = new HashMap<>(); + final Map explodeMap2 = new HashMap<>(); + final Map explodeMap3 = new HashMap<>(); + + explodeMap1.put("bidders", "Baron"); + explodeMap2.put("bidders", "Caro"); + explodeMap3.put("bidders", "Daro"); + + final List> expectedRows = ImmutableList.of(explodeMap1, explodeMap2, explodeMap3); + + + final Parser parser = parseSpec.makeParser(); + final List> parsedRows = parser.parseToMapList(input); + Assert.assertNotNull(parsedRows); + + for (int i = 0; i < parsedRows.size(); i++) { + Assert.assertTrue(expectedRows.get(i).equals(parsedRows.get(i))); + } + Assert.assertTrue(expectedRows.equals(parsedRows)); + final Map parsedRow1 = parsedRows.get(0); + final Map parsedRow2 = parsedRows.get(1); + + Assert.assertNull(parsedRow1.get("jq_omg2")); + Assert.assertNull(parsedRow1.get("path_omg2")); + Assert.assertNull(parsedRow2.get("bar")); + Assert.assertNull(parsedRow2.get("buzz")); + } + + @Test + public void testParseSimpleExplodeFlatten() + { + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_bidder", "$.bids.bidder"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_bid", "$.bids.bid"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_valid", "$.bids.valid"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_winner", "$.bids.winner") + ) + ), + ImmutableList.of( + new JSONExplodeSpec("$.bids", null) + ), + null + ); + + final String input = "{\"bids\":[{\"bidder\":\"Caroline\",\"bid\":28.00,\"valid\":true,\"winner\":false},{\"bidder\":\"Robert\",\"bid\":30.00,\"valid\":true,\"winner\":true}]}"; + final Map explodeMap1 = new HashMap<>(); + final Map explodeMap2 = new HashMap<>(); + + explodeMap1.put("bids_bidder", "Caroline"); + explodeMap1.put("bids_bid", 28.00); + explodeMap1.put("bids_valid", true); + explodeMap1.put("bids_winner", false); + + explodeMap2.put("bids_bidder", "Robert"); + explodeMap2.put("bids_bid", 30.00); + explodeMap2.put("bids_valid", true); + explodeMap2.put("bids_winner", true); + + final List> expectedRows = ImmutableList.of(explodeMap1, explodeMap2); + + + final Parser parser = parseSpec.makeParser(); + final List> parsedRows = parser.parseToMapList(input); + Assert.assertNotNull(parsedRows); + + for (int i = 0; i < parsedRows.size(); i++) { + Assert.assertTrue(expectedRows.get(i).equals(parsedRows.get(i))); + } + Assert.assertTrue(expectedRows.equals(parsedRows)); + final Map parsedRow1 = parsedRows.get(0); + final Map parsedRow2 = parsedRows.get(1); + + Assert.assertNull(parsedRow1.get("jq_omg2")); + Assert.assertNull(parsedRow1.get("path_omg2")); + Assert.assertNull(parsedRow2.get("bar")); + Assert.assertNull(parsedRow2.get("buzz")); + } + + @Test + public void testParseNestedExplode() + { + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_bidder", "$.data.bids.bidder"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_bid", "$.data.bids.bid"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_valid", "$.data.bids.valid"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_winner", "$.data.bids.winner"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_info_bst", "$.data.bids.info.bidStartTime"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_info_round", "$.data.bids.info.round") + ) + ), + ImmutableList.of( + new JSONExplodeSpec("$.data.bids", null), + new JSONExplodeSpec("$.data.bids.info", null) + ), + null + ); + + final String input = "{\"data\":{\"bids\":[{\"bidder\":\"Caroline\",\"bid\":28.00,\"valid\":true,\"winner\":false,\"info\":[{\"bidStartTime\":\"2019-05-08T11:35:00\",\"round\":\"1\"}]},{\"bidder\":\"Robert\",\"bid\":30.00,\"valid\":true,\"winner\":false,\"info\":[{\"bidStartTime\":\"2019-05-08T11:36:00\",\"round\":\"2\"}]}]}}"; + final Map explodeMap1 = new HashMap<>(); + final Map explodeMap2 = new HashMap<>(); + + explodeMap1.put("bids_bidder", "Caroline"); + explodeMap1.put("bids_bid", 28.00); + explodeMap1.put("bids_valid", true); + explodeMap1.put("bids_winner", false); + explodeMap1.put("bids_info_bst", "2019-05-08T11:35:00"); + explodeMap1.put("bids_info_round", "1"); + + explodeMap2.put("bids_bidder", "Robert"); + explodeMap2.put("bids_bid", 30.00); + explodeMap2.put("bids_valid", true); + explodeMap2.put("bids_winner", false); + explodeMap2.put("bids_info_bst", "2019-05-08T11:36:00"); + explodeMap2.put("bids_info_round", "2"); + + final List> expectedRows = ImmutableList.of(explodeMap1, explodeMap2); + + final Parser parser = parseSpec.makeParser(); + final List> parsedRows = parser.parseToMapList(input); + + Assert.assertNotNull(parsedRows); + + for (int i = 0; i < parsedRows.size(); i++) { + Assert.assertTrue(expectedRows.get(i).equals(parsedRows.get(i))); + } + Assert.assertTrue(expectedRows.equals(parsedRows)); + final Map parsedRow1 = parsedRows.get(0); + final Map parsedRow2 = parsedRows.get(1); + + Assert.assertNull(parsedRow1.get("jq_omg2")); + Assert.assertNull(parsedRow1.get("path_omg2")); + Assert.assertNull(parsedRow2.get("bar")); + Assert.assertNull(parsedRow2.get("buzz")); + } + + @Test + public void testParseNestedExplodeImbalaced() + { + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_bidder", "$.data.bids.bidder"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_bid", "$.data.bids.bid"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_valid", "$.data.bids.valid"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_winner", "$.data.bids.winner"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_info_bst", "$.data.bids.info.bidStartTime"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "bids_info_round", "$.data.bids.info.round") + ) + ), + ImmutableList.of( + new JSONExplodeSpec("$.data.bids", null), + new JSONExplodeSpec("$.data.bids.info", null) + ), + null + ); + final String input = "{\"data\":{\"bids\":[{\"bidder\":\"Caroline\",\"bid\":28.00,\"valid\":true,\"winner\":false," + + "\"info\":[{\"bidStartTime\":\"2019-05-08T11:35:00\",\"round\":\"1\"}]},{\"bidder\":" + + "\"Robert\",\"bid\":30.00,\"valid\":true,\"winner\":false,\"info\":[{\"bidStartTime\":" + + "\"2019-05-08T11:36:00\",\"round\":\"2\"},{\"bidStartTime\":\"2019-05-08T11:36:00\"," + + "\"round\":\"3\"}]}]}}"; + final Map explodeMap1 = new HashMap<>(); + final Map explodeMap2 = new HashMap<>(); + final Map explodeMap3 = new HashMap<>(); + + explodeMap1.put("bids_bidder", "Caroline"); + explodeMap1.put("bids_bid", 28.00); + explodeMap1.put("bids_valid", true); + explodeMap1.put("bids_winner", false); + explodeMap1.put("bids_info_bst", "2019-05-08T11:35:00"); + explodeMap1.put("bids_info_round", "1"); + + explodeMap2.put("bids_bidder", "Robert"); + explodeMap2.put("bids_bid", 30.00); + explodeMap2.put("bids_valid", true); + explodeMap2.put("bids_winner", false); + explodeMap2.put("bids_info_bst", "2019-05-08T11:36:00"); + explodeMap2.put("bids_info_round", "2"); + + explodeMap3.put("bids_bidder", "Robert"); + explodeMap3.put("bids_bid", 30.00); + explodeMap3.put("bids_valid", true); + explodeMap3.put("bids_winner", false); + explodeMap3.put("bids_info_bst", "2019-05-08T11:36:00"); + explodeMap3.put("bids_info_round", "3"); + + final List> expectedRows = ImmutableList.of(explodeMap1, explodeMap2, explodeMap3); + + final Parser parser = parseSpec.makeParser(); + final List> parsedRows = parser.parseToMapList(input); + Assert.assertNotNull(parsedRows); + + for (int i = 0; i < parsedRows.size(); i++) { + Assert.assertTrue(expectedRows.get(i).equals(parsedRows.get(i))); + } + Assert.assertTrue(expectedRows.equals(parsedRows)); + final Map parsedRow1 = parsedRows.get(0); + final Map parsedRow2 = parsedRows.get(1); + + Assert.assertNull(parsedRow1.get("jq_omg2")); + Assert.assertNull(parsedRow1.get("path_omg2")); + Assert.assertNull(parsedRow2.get("bar")); + Assert.assertNull(parsedRow2.get("buzz")); + } } + diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONPathParserTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONPathParserTest.java index 0238e6b11733..0811bcfe486d 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONPathParserTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/JSONPathParserTest.java @@ -56,7 +56,7 @@ public class JSONPathParserTest public void testSimple() { List fields = new ArrayList<>(); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, null); final Map jsonMap = jsonParser.parseToMap(JSON); Assert.assertEquals( "jsonMap", @@ -69,7 +69,7 @@ public void testSimple() public void testWithNumbers() { List fields = new ArrayList<>(); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, null); final Map jsonMap = jsonParser.parseToMap(NUMBERS_JSON); Assert.assertEquals( "jsonMap", @@ -82,7 +82,7 @@ public void testWithNumbers() public void testWithWhackyCharacters() { List fields = new ArrayList<>(); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, null); final Map jsonMap = jsonParser.parseToMap(WHACKY_CHARACTER_JSON); Assert.assertEquals( "jsonMap", @@ -112,7 +112,7 @@ public void testNestingWithFieldDiscovery() fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-met-array", ".met.a")); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, null); final Map jsonMap = jsonParser.parseToMap(NESTED_JSON); // Root fields @@ -173,7 +173,7 @@ public void testNestingNoDiscovery() fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-heybarx0", ".hey[0].barx")); fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-met-array", ".met.a")); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null, null); final Map jsonMap = jsonParser.parseToMap(NESTED_JSON); // Root fields @@ -210,7 +210,7 @@ public void testRejectDuplicates() thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Cannot have duplicate field definition: met-array"); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null, null); jsonParser.parseToMap(NESTED_JSON); } @@ -224,7 +224,7 @@ public void testRejectDuplicates2() thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Cannot have duplicate field definition: met-array"); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null, null); jsonParser.parseToMap(NESTED_JSON); } @@ -236,7 +236,7 @@ public void testParseFail() thrown.expect(ParseException.class); thrown.expectMessage("Unable to parse row [" + NOT_JSON + "]"); - final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null); + final Parser jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, null); jsonParser.parseToMap(NOT_JSON); } } diff --git a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java index db98c9e70db5..c6d9c975f1d8 100644 --- a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java +++ b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputRowParserTest.java @@ -68,7 +68,9 @@ public void setUp() new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"), new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName") ) - ), null + ), + null, + null ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 5ede763b8d62..4c6beb9675ba 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -82,6 +82,7 @@ public class KafkaSamplerSpecTest null ), new JSONPathSpec(true, ImmutableList.of()), + null, ImmutableMap.of() ), StandardCharsets.UTF_8.name() diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 89ec2b6541c8..bbee2ea1fe3f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3518,6 +3518,7 @@ private static DataSchema getDataSchema(String dataSource) null ), new JSONPathSpec(true, ImmutableList.of()), + null, ImmutableMap.of() ), StandardCharsets.UTF_8.name() diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 274549891b92..76cefcdab323 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -182,7 +182,7 @@ public static Iterable constructorFeeder() private AppenderatorsManager appenderatorsManager; private final Set checkpointRequestsHash = new HashSet<>(); private RowIngestionMetersFactory rowIngestionMetersFactory; - + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 3cfcd97208ea..01c8e61a2b82 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -87,6 +87,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport null ), new JSONPathSpec(true, ImmutableList.of()), + null, ImmutableMap.of() ), StandardCharsets.UTF_8.name() diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index b9a1ed6e01aa..c1bf711875a8 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -4991,6 +4991,7 @@ private static DataSchema getDataSchema(String dataSource) null ), new JSONPathSpec(true, ImmutableList.of()), + null, ImmutableMap.of() ), StandardCharsets.UTF_8.name() diff --git a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/UriExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/UriExtractionNamespace.java index 75de73d6c200..aff1262379fa 100644 --- a/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/UriExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/org/apache/druid/query/lookup/namespace/UriExtractionNamespace.java @@ -540,6 +540,7 @@ public JSONFlatDataParser( new JSONPathFieldSpec(JSONPathFieldType.ROOT, valueFieldName, valueFieldName) ) ), + null, jsonMapper.copy() ), keyFieldName, diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java index a2c8e9c8ba86..942b20fc35bb 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParserTest.java @@ -71,7 +71,9 @@ public void setUp() new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"), new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar") ) - ), null + ), + null, + null ); } @@ -81,7 +83,7 @@ public void testShortMessageType() { //configure parser with desc file, and specify which file name to use @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "ProtoTestEvent"); parser.initDescriptor(); } @@ -91,7 +93,11 @@ public void testLongMessageType() { //configure parser with desc file, and specify which file name to use @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "prototest.ProtoTestEvent"); + ProtobufInputRowParser parser = new ProtobufInputRowParser( + parseSpec, + "prototest.desc", + "prototest.ProtoTestEvent" + ); parser.initDescriptor(); } @@ -101,7 +107,7 @@ public void testBadProto() { //configure parser with desc file @SuppressWarnings("unused") // expected exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "BadName"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", "BadName"); parser.initDescriptor(); } @@ -110,7 +116,7 @@ public void testMalformedDescriptorUrl() { //configure parser with non existent desc file @SuppressWarnings("unused") // expected exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "file:/nonexist.desc", "BadName"); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "file:/nonexist.desc", "BadName"); parser.initDescriptor(); } @@ -119,7 +125,7 @@ public void testSingleDescriptorNoMessageType() { // For the backward compatibility, protoMessageType allows null when the desc file has only one message type. @SuppressWarnings("unused") // expected to create parser without exception - ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", null); + ProtobufInputRowParser parser = new ProtobufInputRowParser(parseSpec, "prototest.desc", null); parser.initDescriptor(); } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java index e13663ea3019..e83f693dfcef 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java @@ -70,6 +70,7 @@ public class HadoopDruidIndexerMapperTest null ), new JSONPathSpec(true, ImmutableList.of()), + null, ImmutableMap.of() ) ), diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 4cc8ac5018ff..ff42d9a1322f 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -338,6 +338,7 @@ public static Collection constructFeed() new TimestampSpec("ts", "yyyyMMddHH", null), new DimensionsSpec(null, null, null), null, + null, null ), null @@ -371,6 +372,7 @@ public static Collection constructFeed() new TimestampSpec("ts", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("B", "F", "M", "Q", "X", "Y")), null, null), null, + null, null ), null diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 51b44f18d117..d27790bf9fe0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -708,7 +708,6 @@ private Map> collectIntervalsAndShardSp try ( final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser(), firehoseTempDir) ) { - while (firehose.hasMore()) { try { final InputRow inputRow = firehose.nextRow(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java index e0ae3ebe65e1..fa8af7770107 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/FirehoseSampler.java @@ -88,6 +88,13 @@ public Map parseToMap(String input) throw new ParseException(null); } + + @Override + public List> parseToMapList(String input) + { + throw new ParseException(null); + } + @Override public void setFieldNames(Iterable fieldNames) { @@ -162,7 +169,6 @@ public SamplerResponse sample(FirehoseFactory firehoseFactory, DataSchema dataSc if (samplerConfig == null) { samplerConfig = SamplerConfig.empty(); } - final InputRowParser parser = dataSchema.getParser() != null ? dataSchema.getParser() : (firehoseFactory instanceof AbstractTextFilesFirehoseFactory @@ -208,14 +214,12 @@ public SamplerResponse sample(FirehoseFactory firehoseFactory, DataSchema dataSc String raw = null; try { final InputRowPlusRaw row = firehose.nextRowWithRaw(); - if (row == null || row.isEmpty()) { continue; } if (row.getRaw() != null) { raw = StringUtils.fromUtf8(row.getRaw()); - if (!usingCachedData) { dataToCache.add(row.getRaw()); } @@ -248,7 +252,6 @@ public SamplerResponse sample(FirehoseFactory firehoseFactory, DataSchema dataSc counter++; } } - final List columnNames = index.getColumnNames(); columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java index dc7e04d71420..1c6c44bcb53d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerCache.java @@ -44,7 +44,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.NoSuchElementException; public class SamplerCache @@ -124,6 +123,8 @@ public static class SamplerCacheFirehose implements Firehose { private final ByteBufferInputRowParser parser; private final Iterator it; + private Iterator parsedInputRows = new ArrayList().iterator(); + private byte[] raw = null; public SamplerCacheFirehose(ByteBufferInputRowParser parser, Collection data) { @@ -138,7 +139,7 @@ public SamplerCacheFirehose(ByteBufferInputRowParser parser, Collection @Override public boolean hasMore() { - return it.hasNext(); + return it.hasNext() || parsedInputRows.hasNext(); } @Nullable @@ -148,9 +149,12 @@ public InputRow nextRow() if (!hasMore()) { throw new NoSuchElementException(); } - - List rows = parser.parseBatch(ByteBuffer.wrap(it.next())); - return rows.isEmpty() ? null : rows.get(0); + if (parsedInputRows.hasNext()) { + return parsedInputRows.next(); + } + raw = it.next(); + parsedInputRows = parser.parseBatch(ByteBuffer.wrap(raw)).iterator(); + return parsedInputRows.hasNext() ? parsedInputRows.next() : null; } @Override @@ -159,12 +163,9 @@ public InputRowPlusRaw nextRowWithRaw() if (!hasMore()) { throw new NoSuchElementException(); } - - byte[] raw = it.next(); - try { - List rows = parser.parseBatch(ByteBuffer.wrap(raw)); - return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw); + InputRow myrow = nextRow(); + return InputRowPlusRaw.of(myrow, raw); } catch (ParseException e) { return InputRowPlusRaw.of(raw, e); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java index f55c854d0e2c..ec1acb73d090 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java @@ -34,9 +34,12 @@ import java.io.File; import java.io.InputStream; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Deque; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.Optional; public class TestFirehose implements Firehose @@ -113,6 +116,8 @@ public FiniteFirehoseFactory withSplit(InputSplit split) private InputRowParser parser; private boolean closed; + private Iterator parsedInputRows = new ArrayList().iterator(); + private Object raw = null; private TestFirehose(InputRowParser parser, boolean waitForClose, List seedRows) { @@ -144,7 +149,7 @@ public boolean hasMore() while (queue.isEmpty() && !closed) { wait(); } - return !queue.isEmpty(); + return !queue.isEmpty() || parsedInputRows.hasNext(); } } catch (InterruptedException e) { @@ -157,33 +162,42 @@ public boolean hasMore() public InputRow nextRow() { synchronized (this) { - final InputRow row = parser instanceof StringInputRowParser - ? ((StringInputRowParser) parser).parse((String) queue.removeFirst().orElse(null)) - : (InputRow) parser.parseBatch(queue.removeFirst().orElse(null)).get(0); - if (row != null && row.getRaw(FAIL_DIM) != null) { - throw new ParseException(FAIL_DIM); + if (!hasMore()) { + throw new NoSuchElementException(); + } + if (parsedInputRows.hasNext()) { + return parsedInputRows.next(); + } + raw = queue.removeFirst().orElse(null); + parsedInputRows = parser instanceof StringInputRowParser + ? ((StringInputRowParser) parser).parseBatch((String) raw).iterator() + : parser.parseBatch(raw).iterator(); + if (parsedInputRows.hasNext()) { + InputRow row = parsedInputRows.next(); + if (row != null && row.getRaw(FAIL_DIM) != null) { + throw new ParseException(FAIL_DIM); + } + return row; + } else { + throw new ParseException(null); } - return row; } } @Override public InputRowPlusRaw nextRowWithRaw() { - Object next = queue.removeFirst().orElse(null); - synchronized (this) { try { - final InputRow row = parser instanceof StringInputRowParser - ? ((StringInputRowParser) parser).parse((String) next) - : (InputRow) parser.parseBatch(next).get(0); - + final InputRow row = nextRow(); + final Object next = raw; if (row != null && row.getRaw(FAIL_DIM) != null) { throw new ParseException(FAIL_DIM); } return InputRowPlusRaw.of(row, next != null ? StringUtils.toUtf8(next.toString()) : null); } catch (ParseException e) { + final Object next = raw; return InputRowPlusRaw.of(next != null ? StringUtils.toUtf8(next.toString()) : null, e); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 8cb99452a80b..021b57ad8d64 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1029,6 +1029,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception new ArrayList<>() ), null, + null, null ), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 37d7fcee9858..735a16e05c2b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -235,6 +235,7 @@ public Collection getDatabaseSegmentDataSourceSegments(String dataS ImmutableList.of() ), null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index e29f0b985fed..4d83612ac04f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -96,6 +96,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest null ), null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java index 06ab5dc1b58e..5bb1f07df540 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/FirehoseSamplerTest.java @@ -39,6 +39,10 @@ import org.apache.druid.indexing.overlord.sampler.SamplerResponse.SamplerResponseRow; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONExplodeSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.expression.TestExprMacroTable; @@ -104,6 +108,7 @@ private enum ParserType "bad_timestamp,foo,,6" ); + private SamplerCache samplerCache; private FirehoseSampler firehoseSampler; private ParserType parserType; @@ -142,6 +147,7 @@ public void testNoParams() firehoseSampler.sample(null, null, null); } + @Test public void testNoDataSchema() { @@ -264,6 +270,301 @@ public void testMissingValueTimestampSpec() ), data.get(5)); } + @Test + public void testNoFlattenExplode() + { + final List str_json_rows_simple = ImmutableList.of( + "{\"host\":\"kaka\",\"topic\":\"sky\"}" + ); + FirehoseFactory firehoseFactory = getFirehoseFactory(str_json_rows_simple); + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("!!!_no_such_column_!!!", null, DateTimes.of("2010-01-01T00:00:00Z")), + new DimensionsSpec(null), + null, + null, + null + ); + + DataSchema dataSchema = new DataSchema("sampler", OBJECT_MAPPER.convertValue( + new StringInputRowParser(parseSpec, StandardCharsets.UTF_8.name()), + new TypeReference>() + { + } + ), null, null, null, OBJECT_MAPPER); + + SamplerResponse response = firehoseSampler.sample( + firehoseFactory, + dataSchema, + new SamplerConfig(4, null, false, null) + ); + + + Assert.assertEquals(1, (int) response.getNumRowsRead()); + Assert.assertEquals(1, (int) response.getNumRowsIndexed()); + Assert.assertEquals(1, response.getData().size()); + + List data = response.getData(); + + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_simple.get(0).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "host", + "kaka", + "topic", + "sky" + ), + null, + null + ), data.get(0)); + + } + + @Test + public void testExplode() + { + final List str_json_rows_explode = ImmutableList.of( + "{ \"t\": \"2019-04-22T12:00\", \"dim1\": \"foo1\", \"bidders\":[\"Baron\",\"Caro\",\"Daro\"]}", + "{ \"t\": \"2019-04-23T12:00\", \"dim1\": \"foo2\", \"bidders\":[\"Aaron\",\"Baro\",\"Caro\"]}", + "{ \"t\": \"2019-04-24T12:00\", \"dim1\": \"foo3\", \"bidders\":[\"Caron\",\"Daro\",\"Earo\"]}" + ); + FirehoseFactory firehoseFactory = getFirehoseFactory(str_json_rows_explode); + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("t", null, null), new DimensionsSpec(null), + null, + ImmutableList.of( + new JSONExplodeSpec("$.bidders", "path") + ), + null + ); + + DataSchema dataSchema = new DataSchema("sampler", OBJECT_MAPPER.convertValue( + new StringInputRowParser(parseSpec, StandardCharsets.UTF_8.name()), + new TypeReference>() + { + } + ), null, null, null, OBJECT_MAPPER); + + SamplerResponse response = firehoseSampler.sample( + firehoseFactory, + dataSchema, + new SamplerConfig(4, null, false, null) + ); + + + Assert.assertEquals(4, (int) response.getNumRowsRead()); + Assert.assertEquals(4, (int) response.getNumRowsIndexed()); + Assert.assertEquals(4, response.getData().size()); + + List data = response.getData(); + + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode.get(0).toString(), + ImmutableMap.of( + "bidders", + "Baron", + "__time", + 1555934400000L, + "dim1", + "foo1" + ), + null, + null + ), data.get(0)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode.get(0).toString(), + ImmutableMap.of( + "bidders", + "Caro", + "__time", + 1555934400000L, + "dim1", + "foo1" + ), + null, + null + ), data.get(1)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode.get(0).toString(), + ImmutableMap.of( + "bidders", + "Daro", + "__time", + 1555934400000L, + "dim1", + "foo1" + ), + null, + null + ), data.get(2)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode.get(1).toString(), + ImmutableMap.of( + "bidders", + "Aaron", + "__time", + 1556020800000L, + "dim1", + "foo2" + ), + null, + null + ), data.get(3)); + } + + @Test + public void testExplodeMore() + { + final List str_json_rows_explode_complex = ImmutableList.of( + "{\"messages\":[{\"host\":\"clarity\",\"topic\":\"moon\"}],\"value\":5}", + "{\"messages\":[{\"host\":\"kaka\",\"topic\":\"sky\"}],\"value\":6}", + "{\"messages\":[{\"host\":\"pivot\",\"popic\":\"sun\"},{\"host\":\"apm\",\"popic\":\"lol\"}],\"value\":4}" + ); + FirehoseFactory firehoseFactory = getFirehoseFactory(str_json_rows_explode_complex); + final JSONParseSpec parseSpec = new JSONParseSpec( + new TimestampSpec("!!!_no_such_column_!!!", null, DateTimes.of("2010-01-01T00:00:00Z")), + new DimensionsSpec(null), + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "messages.host", "$.messages.host"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "messages.popic", "$.messages.popic"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "messages.topic", "$.messages.topic") + ) + ), + ImmutableList.of( + new JSONExplodeSpec("$.messages", "path") + ), + null + ); + + DataSchema dataSchema = new DataSchema("sampler", OBJECT_MAPPER.convertValue( + new StringInputRowParser(parseSpec, StandardCharsets.UTF_8.name()), + new TypeReference>() + { + } + ), null, null, null, OBJECT_MAPPER); + + SamplerResponse response = firehoseSampler.sample( + firehoseFactory, + dataSchema, + new SamplerConfig(3, null, false, null) + ); + + List data = response.getData(); + Assert.assertEquals(3, (int) response.getNumRowsRead()); + Assert.assertEquals(3, (int) response.getNumRowsIndexed()); + Assert.assertEquals(3, response.getData().size()); + + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode_complex.get(0).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "messages.topic", + "moon", + "messages.host", + "clarity", + "value", + "5" + ), + null, + null + ), data.get(0)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode_complex.get(1).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "messages.topic", + "sky", + "messages.host", + "kaka", + "value", + "6" + ), + null, + null + ), data.get(1)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode_complex.get(2).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "messages.popic", + "sun", + "messages.host", + "pivot", + "value", + "4" + ), + null, + null + ), data.get(2)); + + // check if cache work fine + String cacheKey = response.getCacheKey(); + SamplerResponse cached_response = firehoseSampler.sample( + firehoseFactory, + dataSchema, + new SamplerConfig(500, cacheKey, false, 15000) + ); + data = cached_response.getData(); + + Assert.assertEquals(3, (int) response.getNumRowsRead()); + Assert.assertEquals(3, (int) response.getNumRowsIndexed()); + Assert.assertEquals(3, response.getData().size()); + + + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode_complex.get(0).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "messages.topic", + "moon", + "messages.host", + "clarity", + "value", + "5" + ), + null, + null + ), data.get(0)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode_complex.get(1).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "messages.topic", + "sky", + "messages.host", + "kaka", + "value", + "6" + ), + null, + null + ), data.get(1)); + Assert.assertEquals(new SamplerResponseRow( + str_json_rows_explode_complex.get(2).toString(), + ImmutableMap.of( + "__time", + 1262304000000L, + "messages.popic", + "sun", + "messages.host", + "pivot", + "value", + "4" + ), + null, + null + ), data.get(2)); + + } + @Test public void testWithTimestampSpec() { @@ -786,6 +1087,7 @@ private List getTestRows() } } + private FirehoseFactory getFirehoseFactory(List seedRows) { return ParserType.MAP.equals(parserType) @@ -808,7 +1110,7 @@ private ParseSpec getParseSpec(TimestampSpec timestampSpec, DimensionsSpec dimen ImmutableList.of("t", "dim1", "dim2", "met1"), false, 0 - ) : new JSONParseSpec(timestampSpec, dimensionsSpec, null, null); + ) : new JSONParseSpec(timestampSpec, dimensionsSpec, null, null, null); } private String getUnparseableTimestampString() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index b8c2715b1384..4fc1cb4adf7f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -116,7 +116,8 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport null ), new JSONPathSpec(true, ImmutableList.of()), - ImmutableMap.of() + null, + null ), StandardCharsets.UTF_8.name() ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 9d22d37178cd..3f56a70a2b1c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -571,6 +571,7 @@ private static DataSchema getDataSchema() null ), new JSONPathSpec(true, ImmutableList.of()), + null, ImmutableMap.of() ), StandardCharsets.UTF_8.name() diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformingStringInputRowParser.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformingStringInputRowParser.java index d9a0aa2d954a..3617e3ea8acb 100644 --- a/processing/src/main/java/org/apache/druid/segment/transform/TransformingStringInputRowParser.java +++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformingStringInputRowParser.java @@ -50,6 +50,12 @@ public List parseBatch(final ByteBuffer input) return super.parseBatch(input).stream().map(transformer::transform).collect(Collectors.toList()); } + @Override + public List parseBatch(final String input) + { + return super.parseBatch(input).stream().map(transformer::transform).collect(Collectors.toList()); + } + @Nullable @Override public InputRow parse(@Nullable final String input) diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java index bfb0503f047c..cb93b2cf8874 100644 --- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java +++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java @@ -127,6 +127,7 @@ private Druids.ScanQueryBuilder newTestQuery() ImmutableList.of() ), null, + null, null ) ); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index 6ce4f0d985cf..62fc8c043491 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -148,7 +148,6 @@ public InputRowParser getParser() dimensionExclusions.addAll(aggregator.requiredFields()); dimensionExclusions.add(aggregator.getName()); } - if (inputRowParser.getParseSpec() != null) { final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec(); final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec(); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java index 0c93d25d652e..f88a41d9f84d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/InlineFirehose.java @@ -32,6 +32,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Iterator; import java.util.NoSuchElementException; /** @@ -41,6 +43,8 @@ public class InlineFirehose implements Firehose { private final StringInputRowParser parser; private final LineIterator lineIterator; + private Iterator parsedInputRows = new ArrayList().iterator(); + private String raw = null; InlineFirehose(String data, StringInputRowParser parser) throws IOException { @@ -53,6 +57,11 @@ public class InlineFirehose implements Firehose @Override public boolean hasMore() + { + return lineIterator.hasNext() || parsedInputRows.hasNext(); + } + + private boolean hasMoreRaw() { return lineIterator.hasNext(); } @@ -60,24 +69,31 @@ public boolean hasMore() @Override public InputRow nextRow() { - return parser.parse(nextRaw()); + if (parsedInputRows.hasNext()) { + return parsedInputRows.next(); + } + parsedInputRows = parser.parseBatch(nextRaw()).iterator(); + if (parsedInputRows.hasNext()) { + return parsedInputRows.next(); + } else { + throw new NoSuchElementException(); + } } private String nextRaw() { - if (!hasMore()) { + if (!hasMoreRaw()) { throw new NoSuchElementException(); } - - return lineIterator.next(); + raw = lineIterator.next(); + return raw; } @Override public InputRowPlusRaw nextRowWithRaw() { - String raw = nextRaw(); try { - return InputRowPlusRaw.of(parser.parse(raw), StringUtils.toUtf8(raw)); + return InputRowPlusRaw.of(nextRow(), StringUtils.toUtf8(raw)); } catch (ParseException e) { return InputRowPlusRaw.of(StringUtils.toUtf8(raw), e); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java index 5a132c35d37f..184882eeff41 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java @@ -35,6 +35,8 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -77,6 +79,7 @@ class TimedShutoffFirehose implements Firehose { private final Firehose firehose; private final ScheduledExecutorService shutdownExec; + private Iterator parsedInputRows = new ArrayList().iterator(); @GuardedBy("this") private boolean closed = false; @@ -85,7 +88,6 @@ class TimedShutoffFirehose implements Firehose firehose = sampling ? delegateFactory.connectForSampler(parser, temporaryDirectory) : delegateFactory.connect(parser, temporaryDirectory); - shutdownExec = Execs.scheduledSingleThreaded("timed-shutoff-firehose-%d"); shutdownExec.schedule( diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index 437efd313ce0..db8e6956e5ab 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -72,6 +72,7 @@ public void testDefaultExclusions() new TimestampSpec("time", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null), null, + null, null ), null @@ -109,6 +110,7 @@ public void testExplicitInclude() null ), null, + null, null ), null @@ -146,6 +148,7 @@ public void testTransformSpec() null ), null, + null, null ), null @@ -204,6 +207,7 @@ public void testOverlapMetricNameAndDim() "metric1" )), ImmutableList.of("dimC"), null), null, + null, null ), null @@ -237,6 +241,7 @@ public void testDuplicateAggregators() null ), null, + null, null ), null @@ -302,6 +307,7 @@ public void testEmptyDatasource() null ), null, + null, null ), null @@ -397,6 +403,7 @@ public void testSerde() throws Exception new TimestampSpec("xXx", null, null), new DimensionsSpec(null, Arrays.asList("metric1", "xXx", "col1"), null), null, + null, null ) ); @@ -424,6 +431,7 @@ public void testSerdeWithUpdatedDataSchemaAddedField() throws IOException new TimestampSpec("time", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null), null, + null, null ), null @@ -463,6 +471,7 @@ public void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException new TimestampSpec("time", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null), null, + null, null ), null diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java index 2cd2c74f37c2..79fb5d10d7ed 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java @@ -89,6 +89,7 @@ public void testSerde() throws Exception null ), null, + null, null ), null diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index 2bca96ac7486..9c53c9ab394f 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -130,6 +130,7 @@ public AppenderatorTester( new TimestampSpec("ts", "auto", null), new DimensionsSpec(null, null, null), null, + null, null ) ), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index fed1d705e9c9..9e9d350a30ab 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -115,6 +115,7 @@ public int columnCacheSizeBytes() new TimestampSpec("ts", "auto", null), new DimensionsSpec(null, null, null), null, + null, null ) ), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java index 878a7d4b83e5..766c2a3596b7 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java @@ -77,6 +77,7 @@ public void setUp() null ), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null), null, + null, null ) ), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java index 683e870bae03..93d259fab676 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java @@ -93,6 +93,7 @@ public void setUp() null ), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null), null, + null, null ) ), @@ -240,6 +241,7 @@ public void testDuplicateRegistering() null ), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null), null, + null, null ) ), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java index f24cc8528ec7..99b04c73c0e5 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/InlineFirehoseTest.java @@ -19,12 +19,19 @@ package org.apache.druid.segment.realtime.firehose; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowPlusRaw; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.parsers.JSONExplodeSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.junit.Assert; import org.junit.Test; @@ -63,6 +70,26 @@ public class InlineFirehoseTest ), CHARSET.name() ); + + private static final StringInputRowParser JSON_PARSER = new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("!!!_no_such_column_!!!", null, DateTimes.of("2010-01-01T00:00:00Z")), + new DimensionsSpec(null), + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "messages.topic", "$.messages.topic"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "messages.popic", "$.messages.popic"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "messages.host", "$.messages.host") + ) + ), + ImmutableList.of( + new JSONExplodeSpec("$.messages", null) + ), + null + ), + CHARSET.name() + ); private static final String EMPTY = ""; private static final String TIMESTAMP_0 = "0"; private static final String VALUE_0 = "a"; @@ -75,6 +102,7 @@ public class InlineFirehoseTest private static final String LINE_1 = TIMESTAMP_1 + DELIMITER + VALUE_1; private static final String MULTILINE = LINE_0 + "\n" + LINE_1; + @Test public void testHasMoreEmpty() { @@ -193,6 +221,24 @@ public void testMultiline() Assert.assertFalse(target.hasMore()); } + @Test + public void testExplodeSpec() throws IOException + { + String data = + "{\"messages\":[{\"host\":\"clarity\",\"topic\":\"moon\"},{\"host\":\"kaka\",\"topic\":\"sky\"}],\"value\":5}\n" + + "{\"messages\":[{\"host\":\"pivot\",\"popic\":\"sun\"},{\"host\":\"apm\",\"popic\":\"lol\"}],\"value\":4}\n" + + "{\"messages\":[{\"host\":\"imply\",\"dopik\":\"fun\"}],\"value\":2}"; + InlineFirehose testFirehose = new InlineFirehose(data, JSON_PARSER); + int count = 0; + while (testFirehose.hasMore()) { + InputRowPlusRaw parsed = testFirehose.nextRowWithRaw(); + count++; + } + Assert.assertEquals(5, count); + + + } + private static InlineFirehose create(String data) { try { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 617f17a700fc..0f0747b11334 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -141,6 +141,7 @@ public void setUp() throws Exception new TimestampSpec("timestamp", "auto", null), new DimensionsSpec(null, null, null), null, + null, null ), null @@ -161,6 +162,7 @@ public void setUp() throws Exception new TimestampSpec("timestamp", "auto", null), new DimensionsSpec(null, null, null), null, + null, null ), null