Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,28 @@ For example:
}
```

### Lines

Configure the Lines `inputFormat` to load line-oriented data where each line is treated as a single field:

| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | Set value to `lines`. | yes |

The Lines input format reads each line from the input as UTF-8 text, and creates a single column named `line` containing the entire line as a string.
This is useful for reading line-oriented data in a simple form for later processing.

For example:

```json
"ioConfig": {
"inputFormat": {
"type": "lines"
},
...
}
```

### ORC

To use the ORC input format, load the Druid Orc extension ( [`druid-orc-extensions`](../development/extensions-core/orc.md)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DelimitedInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LinesInputFormat;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
Expand All @@ -48,6 +49,7 @@
@JsonSubTypes(value = {
@Type(name = CsvInputFormat.TYPE_KEY, value = CsvInputFormat.class),
@Type(name = JsonInputFormat.TYPE_KEY, value = JsonInputFormat.class),
@Type(name = LinesInputFormat.TYPE_KEY, value = LinesInputFormat.class),
@Type(name = RegexInputFormat.TYPE_KEY, value = RegexInputFormat.class),
@Type(name = DelimitedInputFormat.TYPE_KEY, value = DelimitedInputFormat.class)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,7 @@ public boolean shouldTryParseNumbers()
@Override
public long getWeightedSize(String path, long size)
{
CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
if (CompressionUtils.Format.GZ == compressionFormat) {
return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
}
return size;
return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
@Override
public long getWeightedSize(String path, long size)
{
CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
if (CompressionUtils.Format.GZ == compressionFormat) {
return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
}
return size;
return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.utils.CompressionUtils;

import java.io.File;

/**
* Input format that breaks the input on newlines, and returns a single column named {@link LinesReader#LINE_COLUMN}.
*/
public class LinesInputFormat implements InputFormat
{
public static final String TYPE_KEY = "lines";

@JsonCreator
public LinesInputFormat()
{
}

@Override
public boolean isSplittable()
{
return false;
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new LinesReader(inputRowSchema, source);
}

@Override
public long getWeightedSize(String path, long size)
{
return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.util.List;
import java.util.Map;

/**
* Reader for {@link LinesInputFormat}.
*/
public class LinesReader extends TextReader.Strings
{
private static final String LINE_COLUMN = "line";

LinesReader(
InputRowSchema inputRowSchema,
InputEntity source
)
{
super(inputRowSchema, source);
}

@Override
public List<InputRow> parseInputRows(String intermediateRow) throws ParseException
{
return List.of(MapInputRowParser.parse(getInputRowSchema(), parseLine(intermediateRow)));
}

@Override
protected List<Map<String, Object>> toMap(String intermediateRow)
{
return List.of(parseLine(intermediateRow));
}

private Map<String, Object> parseLine(String line)
{
return Map.of(LINE_COLUMN, line);
}

@Override
public int getNumHeaderLinesToSkip()
{
return 0;
}

@Override
public boolean needsToProcessHeaderLine()
{
return false;
}

@Override
public void processHeaderLine(String line)
{
// Nothing to do.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
@Override
public long getWeightedSize(String path, long size)
{
CompressionUtils.Format compressionFormat = CompressionUtils.Format.fromFileName(path);
if (CompressionUtils.Format.GZ == compressionFormat) {
return size * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR;
}
return size;
return size * CompressionUtils.estimatedCompressionFactor(CompressionUtils.Format.fromFileName(path));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can extend CompressionUtils.estimatedCompressionFactor() or add a thin wrapper that accepts the path directly and returns the compression factor - internally invoking CompressionUtils.Format.fromFileName()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these are equivalently useful for what we need them for right now, so I'll keep it the way I had it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reviewing!

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.utils.FileNameUtils;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -120,7 +121,7 @@ public static Format fromFileName(@Nullable String fileName)
}
}

public static final long COMPRESSED_TEXT_WEIGHT_FACTOR = 4L;
public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4;
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512
Expand Down Expand Up @@ -655,4 +656,19 @@ public static InputStream decompress(final InputStream in, final String fileName
return in;
}
}

/**
* Returns the estimated compression factor for a given format on plain text. This is used by certain implementations
* of {@link InputFormat#getWeightedSize(String, long)}.
*/
public static int estimatedCompressionFactor(@Nullable final Format format)
{
// The check for GZ specifically was originally added in https://github.com/apache/druid/pull/14307, and later
// moved here. It may make sense to expand this to other (all?) compression formats.
if (format == Format.GZ) {
return COMPRESSED_TEXT_WEIGHT_FACTOR;
} else {
return 1;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.utils.CompressionUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;

public class LinesInputFormatTest
{
private ObjectMapper mapper;

@Before
public void setUp() throws Exception
{
mapper = TestHelper.makeJsonMapper();
}

@Test
public void testSerde() throws IOException
{
final LinesInputFormat expected = new LinesInputFormat();

final byte[] json = mapper.writeValueAsBytes(expected);

// Read as map
final Map<String, Object> map = mapper.readValue(json, Map.class);
Assert.assertEquals("lines", map.get("type"));

// Read as InputFormat
final InputFormat fromJson = mapper.readValue(json, InputFormat.class);
MatcherAssert.assertThat(fromJson, Matchers.instanceOf(LinesInputFormat.class));
}

@Test
public void test_getWeightedSize_withoutCompression()
{
final LinesInputFormat format = new LinesInputFormat();
final long unweightedSize = 100L;
Assert.assertEquals(unweightedSize, format.getWeightedSize("file.txt", unweightedSize));
}

@Test
public void test_getWeightedSize_withGzCompression()
{
final LinesInputFormat format = new LinesInputFormat();
final long unweightedSize = 100L;
Assert.assertEquals(
unweightedSize * CompressionUtils.COMPRESSED_TEXT_WEIGHT_FACTOR,
format.getWeightedSize("file.txt.gz", unweightedSize)
);
}
}
Loading
Loading