diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index a595b1c6ded7..b36ab87d6545 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -168,6 +168,28 @@ For example: } ``` +### Lines + +Configure the Lines `inputFormat` to load line-oriented data where each line is treated as a single field: + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | Set value to `lines`. | yes | + +The Lines input format reads each line from the input as UTF-8 text, and creates a single column named `line` containing the entire line as a string. +This is useful for reading line-oriented data in a simple form for later processing. + +For example: + +```json +"ioConfig": { + "inputFormat": { + "type": "lines" + }, + ... +} +``` + ### ORC To use the ORC input format, load the Druid Orc extension ( [`druid-orc-extensions`](../development/extensions-core/orc.md)). diff --git a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java index ebf2dea66875..0103ea66c9cb 100644 --- a/processing/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DelimitedInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.LinesInputFormat; import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; @@ -48,6 +49,7 @@ @JsonSubTypes(value = { @Type(name = CsvInputFormat.TYPE_KEY, value = CsvInputFormat.class), @Type(name = JsonInputFormat.TYPE_KEY, value = JsonInputFormat.class), + @Type(name = LinesInputFormat.TYPE_KEY, value = LinesInputFormat.class), @Type(name = RegexInputFormat.TYPE_KEY, value = RegexInputFormat.class), @Type(name = DelimitedInputFormat.TYPE_KEY, value = DelimitedInputFormat.class) }) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java index 39086b6838e5..f046a894f40b 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -145,11 +145,7 @@ public boolean shouldTryParseNumbers() @Override public long getWeightedSize(String path, long size) { - CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); - if (CompressionUtils.Format.GZ == compressionFormat) { - return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; - } - return size; + return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path)); } @Override diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 1155bd23e305..a605fd7a8aea 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -174,11 +174,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity @Override public long getWeightedSize(String path, long size) { - CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); - if (CompressionUtils.Format.GZ == compressionFormat) { - return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; - } - return size; + return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path)); } /** diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java new file mode 100644 index 000000000000..1d47b71992b1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LinesInputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.utils.CompressionUtils; + +import java.io.File; + +/** + * Input format that breaks the input on newlines, and returns a single column named {@link LinesReader#LINE_COLUMN}. + */ +public class LinesInputFormat implements InputFormat +{ + public static final String TYPE_KEY = "lines"; + + @JsonCreator + public LinesInputFormat() + { + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new LinesReader(inputRowSchema, source); + } + + @Override + public long getWeightedSize(String path, long size) + { + return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path)); + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java new file mode 100644 index 000000000000..57f32599e13a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/data/input/impl/LinesReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.TextReader; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.util.List; +import java.util.Map; + +/** + * Reader for {@link LinesInputFormat}. + */ +public class LinesReader extends TextReader.Strings +{ + private static final String LINE_COLUMN = "line"; + + LinesReader( + InputRowSchema inputRowSchema, + InputEntity source + ) + { + super(inputRowSchema, source); + } + + @Override + public List parseInputRows(String intermediateRow) throws ParseException + { + return List.of(MapInputRowParser.parse(getInputRowSchema(), parseLine(intermediateRow))); + } + + @Override + protected List> toMap(String intermediateRow) + { + return List.of(parseLine(intermediateRow)); + } + + private Map parseLine(String line) + { + return Map.of(LINE_COLUMN, line); + } + + @Override + public int getNumHeaderLinesToSkip() + { + return 0; + } + + @Override + public boolean needsToProcessHeaderLine() + { + return false; + } + + @Override + public void processHeaderLine(String line) + { + // Nothing to do. + } +} diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java index a6b823ae8915..a3057fa212b2 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java @@ -96,10 +96,6 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity @Override public long getWeightedSize(String path, long size) { - CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path); - if (CompressionUtils.Format.GZ == compressionFormat) { - return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR; - } - return size; + return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path)); } } diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 7ab05b907196..01b45c5089af 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream; import org.apache.commons.compress.utils.FileNameUtils; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -120,7 +121,7 @@ public static Format fromFileName(@Nullable String fileName) } } - public static final long COMPRESSED_TEXT_WEIGHT_FACTOR = 4L; + public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4; private static final Logger log = new Logger(CompressionUtils.class); private static final int DEFAULT_RETRY_COUNT = 3; private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512 @@ -655,4 +656,19 @@ public static InputStream decompress(final InputStream in, final String fileName return in; } } + + /** + * Returns the estimated compression factor for a given format on plain text. This is used by certain implementations + * of {@link InputFormat#getWeightedSize(String, long)}. + */ + public static int estimatedCompressionFactor(@Nullable final Format format) + { + // The check for GZ specifically was originally added in https://github.com/apache/druid/pull/14307, and later + // moved here. It may make sense to expand this to other (all?) compression formats. + if (format == Format.GZ) { + return COMPRESSED_TEXT_WEIGHT_FACTOR; + } else { + return 1; + } + } } diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java new file mode 100644 index 000000000000..e2b132a37e4c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LinesInputFormatTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.utils.CompressionUtils; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +public class LinesInputFormatTest +{ + private ObjectMapper mapper; + + @Before + public void setUp() throws Exception + { + mapper = TestHelper.makeJsonMapper(); + } + + @Test + public void testSerde() throws IOException + { + final LinesInputFormat expected = new LinesInputFormat(); + + final byte[] json = mapper.writeValueAsBytes(expected); + + // Read as map + final Map map = mapper.readValue(json, Map.class); + Assert.assertEquals("lines", map.get("type")); + + // Read as InputFormat + final InputFormat fromJson = mapper.readValue(json, InputFormat.class); + MatcherAssert.assertThat(fromJson, Matchers.instanceOf(LinesInputFormat.class)); + } + + @Test + public void test_getWeightedSize_withoutCompression() + { + final LinesInputFormat format = new LinesInputFormat(); + final long unweightedSize = 100L; + Assert.assertEquals(unweightedSize, format.getWeightedSize("file.txt", unweightedSize)); + } + + @Test + public void test_getWeightedSize_withGzCompression() + { + final LinesInputFormat format = new LinesInputFormat(); + final long unweightedSize = 100L; + Assert.assertEquals( + unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR, + format.getWeightedSize("file.txt.gz", unweightedSize) + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java new file mode 100644 index 000000000000..faf7d632c0a8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/data/input/impl/LinesReaderTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class LinesReaderTest extends InitializedNullHandlingTest +{ + @Test + public void testSimpleLineParsing() throws IOException + { + final String input = "line1\nline2\nline3"; + final InputEntityReader reader = createReader(input, Collections.emptyList()); + + final List inputRows = readAllRows(reader); + + Assert.assertEquals(3, inputRows.size()); + Assert.assertEquals("line1", inputRows.get(0).getRaw("line")); + Assert.assertEquals("line2", inputRows.get(1).getRaw("line")); + Assert.assertEquals("line3", inputRows.get(2).getRaw("line")); + } + + @Test + public void testEmptyLines() throws IOException + { + final String input = "line1\n\n line3\n"; + final InputEntityReader reader = createReader(input, Collections.emptyList()); + + final List inputRows = readAllRows(reader); + + Assert.assertEquals(3, inputRows.size()); + Assert.assertEquals("line1", inputRows.get(0).getRaw("line")); + Assert.assertEquals("", inputRows.get(1).getRaw("line")); + Assert.assertEquals(" line3", inputRows.get(2).getRaw("line")); + } + + @Test + public void testSingleLine() throws IOException + { + final String input = "single line without newline"; + final InputEntityReader reader = createReader(input, Collections.emptyList()); + + final List inputRows = readAllRows(reader); + + Assert.assertEquals(1, inputRows.size()); + Assert.assertEquals("single line without newline", inputRows.get(0).getRaw("line")); + } + + @Test + public void testToMap() + { + final String input = "test line"; + final LinesReader reader = (LinesReader) createReader(input, Collections.emptyList()); + + final List> maps = reader.toMap("test line"); + + Assert.assertEquals(1, maps.size()); + Assert.assertEquals(ImmutableMap.of("line", "test line"), maps.get(0)); + } + + private InputEntityReader createReader( + String input, + List columns + ) + { + final InputRowSchema inputRowSchema = new InputRowSchema( + new TimestampSpec("__time", "auto", DateTimes.of("2000-01-01T00:00:00.000Z")), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(columns)), + ColumnsFilter.all() + ); + + final ByteEntity entity = new ByteEntity(StringUtils.toUtf8(input)); + + return new LinesReader(inputRowSchema, entity); + } + + private List readAllRows(InputEntityReader reader) throws IOException + { + try (CloseableIterator iterator = reader.read()) { + return Lists.newArrayList(iterator); + } + } +}