diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java index 5940e70e11fd..3ee0f71f62bd 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.java.util.common.parsers.DelimitedParser; import org.apache.druid.java.util.common.parsers.Parser; @@ -123,6 +124,12 @@ public Parser makeParser() ); } + @Override + public InputFormat toInputFormat() + { + return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows); + } + @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 9ac35a9fab9e..ae2cd88bf8f8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -126,6 +126,7 @@ private InputFormat getGivenInputFormat() return inputFormat; } + @Nullable public InputFormat getInputFormat(ParseSpec parseSpec) { return inputFormat == null ? Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat() : inputFormat; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 1577833d5fdd..bcdcecce304a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -202,6 +202,7 @@ public enum Status private final SeekableStreamIndexTaskIOConfig ioConfig; private final SeekableStreamIndexTaskTuningConfig tuningConfig; private final InputRowSchema inputRowSchema; + @Nullable private final InputFormat inputFormat; @Nullable private final InputRowParser parser; @@ -372,12 +373,10 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception // Now we can initialize StreamChunkReader with the given toolbox. final StreamChunkParser parser = new StreamChunkParser( this.parser, - new SettableByteEntityReader( - inputFormat, - inputRowSchema, - task.getDataSchema().getTransformSpec(), - toolbox.getIndexingTmpDir() - ) + inputFormat, + inputRowSchema, + task.getDataSchema().getTransformSpec(), + toolbox.getIndexingTmpDir() ); initializeSequences(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java index 3f9b28138d82..6061aff7423b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java @@ -19,12 +19,17 @@ package org.apache.druid.indexing.seekablestream; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.transform.TransformSpec; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -38,20 +43,42 @@ class StreamChunkParser { @Nullable private final InputRowParser parser; + @Nullable private final SettableByteEntityReader byteEntityReader; - StreamChunkParser(@Nullable InputRowParser parser, SettableByteEntityReader byteEntityReader) + /** + * Either parser or inputFormat shouldn't be null. + */ + StreamChunkParser( + @Nullable InputRowParser parser, + @Nullable InputFormat inputFormat, + InputRowSchema inputRowSchema, + TransformSpec transformSpec, + File indexingTmpDir + ) { + if (parser == null && inputFormat == null) { + throw new IAE("Either parser or inputFormat should be set"); + } this.parser = parser; - this.byteEntityReader = byteEntityReader; + if (inputFormat != null) { + this.byteEntityReader = new SettableByteEntityReader( + inputFormat, + inputRowSchema, + transformSpec, + indexingTmpDir + ); + } else { + this.byteEntityReader = null; + } } List parse(List streamChunk) throws IOException { - if (parser != null) { - return parseWithParser(parser, streamChunk); - } else { + if (byteEntityReader != null) { return parseWithInputFormat(byteEntityReader, streamChunk); + } else { + return parseWithParser(parser, streamChunk); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java new file mode 100644 index 000000000000..76a02db31385 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.segment.transform.TransformSpec; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class StreamChunkParserTest +{ + private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(null, null, null); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testWithParserAndNullInputformatParseProperly() throws IOException + { + final InputRowParser parser = new StringInputRowParser( + new NotConvertibleToInputFormatParseSpec(), + StringUtils.UTF8_STRING + ); + final StreamChunkParser chunkParser = new StreamChunkParser( + parser, + // Set nulls for all parameters below since inputFormat will be never used. + null, + null, + null, + null + ); + parseAndAssertResult(chunkParser); + } + + @Test + public void testWithNullParserAndInputformatParseProperly() throws IOException + { + final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap()); + final StreamChunkParser chunkParser = new StreamChunkParser( + null, + inputFormat, + new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()), + TransformSpec.NONE, + temporaryFolder.newFolder() + ); + parseAndAssertResult(chunkParser); + } + + @Test + public void testWithNullParserAndNullInputformatFailToCreateParser() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Either parser or inputFormat should be set"); + final StreamChunkParser chunkParser = new StreamChunkParser( + null, + null, + null, + null, + null + ); + } + + @Test + public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException + { + final InputRowParser parser = new StringInputRowParser( + new NotConvertibleToInputFormatParseSpec(), + StringUtils.UTF8_STRING + ); + final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat( + JSONPathSpec.DEFAULT, + Collections.emptyMap() + ); + final StreamChunkParser chunkParser = new StreamChunkParser( + parser, + inputFormat, + new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()), + TransformSpec.NONE, + temporaryFolder.newFolder() + ); + parseAndAssertResult(chunkParser); + Assert.assertTrue(inputFormat.used); + } + + private void parseAndAssertResult(StreamChunkParser chunkParser) throws IOException + { + final String json = "{\"timestamp\": \"2020-01-01\", \"dim\": \"val\", \"met\": \"val2\"}"; + List parsedRows = chunkParser.parse(Collections.singletonList(json.getBytes(StringUtils.UTF8_STRING))); + Assert.assertEquals(1, parsedRows.size()); + InputRow row = parsedRows.get(0); + Assert.assertEquals(DateTimes.of("2020-01-01"), row.getTimestamp()); + Assert.assertEquals("val", Iterables.getOnlyElement(row.getDimension("dim"))); + Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met"))); + } + + private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec + { + private NotConvertibleToInputFormatParseSpec() + { + super( + TIMESTAMP_SPEC, + DimensionsSpec.EMPTY, + JSONPathSpec.DEFAULT, + Collections.emptyMap() + ); + } + + @Override + public InputFormat toInputFormat() + { + return null; + } + } + + private static class TrackingJsonInputFormat extends JsonInputFormat + { + private boolean used; + + private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, @Nullable Map featureSpec) + { + super(flattenSpec, featureSpec); + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + used = true; + return super.createReader(inputRowSchema, source, temporaryDirectory); + } + } +}