From 1c0a219ea0d38987fb8a868aaed9cbd6e986b6f7 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Wed, 20 Nov 2019 16:56:49 -0800 Subject: [PATCH 1/8] add TsvInputFormat --- .../apache/druid/data/input/InputFormat.java | 4 +- .../druid/data/input/impl/TsvInputFormat.java | 156 +++++++++ .../druid/data/input/impl/TsvReader.java | 133 ++++++++ .../druid/data/input/impl/CsvReaderTest.java | 8 + .../data/input/impl/TsvInputFormatTest.java | 54 ++++ .../druid/data/input/impl/TsvReaderTest.java | 302 ++++++++++++++++++ 6 files changed, 656 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java b/core/src/main/java/org/apache/druid/data/input/InputFormat.java index 2ea04bd1e0b5..9ba850eef708 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.data.input.impl.RegexInputFormat; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.data.input.impl.TsvInputFormat; import org.apache.druid.guice.annotations.UnstableApi; import java.io.File; @@ -44,7 +45,8 @@ @JsonSubTypes(value = { @Type(name = "csv", value = CsvInputFormat.class), @Type(name = "json", value = JsonInputFormat.class), - @Type(name = "regex", value = RegexInputFormat.class) + @Type(name = "regex", value = RegexInputFormat.class), + @Type(name = "tsv", value = TsvInputFormat.class) }) public interface InputFormat { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java new file mode 100644 index 000000000000..e6f84d2dd675 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class TsvInputFormat implements InputFormat +{ + private final String listDelimiter; + private final List columns; + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + + @JsonCreator + public TsvInputFormat( + @JsonProperty("columns") @Nullable List columns, + @JsonProperty("listDelimiter") @Nullable String listDelimiter, + @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, + @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, + @JsonProperty("skipHeaderRows") int skipHeaderRows + ) + { + this.listDelimiter = listDelimiter; + this.columns = columns == null ? Collections.emptyList() : columns; + //noinspection ConstantConditions + this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("hasHeaderRow", hasHeaderRow), + new Property<>("findColumnsFromHeader", findColumnsFromHeader) + ) + ).getValue(); + this.skipHeaderRows = skipHeaderRows; + + if (!this.columns.isEmpty()) { + for (String column : this.columns) { + Preconditions.checkArgument(!column.contains("\t"), "Column[%s] has a tab, it cannot", column); + } + } else { + Preconditions.checkArgument( + this.findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + + @VisibleForTesting + public TsvInputFormat( + List columns, + String listDelimiter, + boolean findColumnsFromHeader, + int skipHeaderRows + ) + { + this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows); + } + + @JsonProperty + public List getColumns() + { + return columns; + } + + @JsonProperty + public String getListDelimiter() + { + return listDelimiter; + } + + @JsonProperty + public boolean isFindColumnsFromHeader() + { + return findColumnsFromHeader; + } + + @JsonProperty + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new TsvReader( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TsvInputFormat format = (TsvInputFormat) o; + return findColumnsFromHeader == format.findColumnsFromHeader && + skipHeaderRows == format.skipHeaderRows && + Objects.equals(listDelimiter, format.listDelimiter) && + Objects.equals(columns, format.columns); + } + + @Override + public int hashCode() + { + return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows); + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java new file mode 100644 index 000000000000..ffdcd9319e24 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.opencsv.RFC4180Parser; +import com.opencsv.RFC4180ParserBuilder; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.TextReader; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.common.parsers.ParserUtils; +import org.apache.druid.java.util.common.parsers.Parsers; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class TsvReader extends TextReader +{ + private final RFC4180Parser parser = new RFC4180ParserBuilder() + .withSeparator('\t') + .build(); + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + private final Function multiValueFunction; + @Nullable + private List columns; + + TsvReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + @Nullable String listDelimiter, + @Nullable List columns, + boolean findColumnsFromHeader, + int skipHeaderRows + ) + { + super(inputRowSchema, source, temporaryDirectory); + this.findColumnsFromHeader = findColumnsFromHeader; + this.skipHeaderRows = skipHeaderRows; + final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; + this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); + this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row + + if (this.columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument(!column.contains("\t"), "Column[%s] has a tab, it cannot", column); + } + } else { + Preconditions.checkArgument( + findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + + @Override + public List parseInputRows(String line) throws IOException, ParseException + { + final Map zipped = parseLine(line); + return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); + } + + @Override + public String toJson(String intermediateRow) throws IOException + { + final Map zipped = parseLine(intermediateRow); + return DEFAULT_JSON_WRITER.writeValueAsString(zipped); + } + + private Map parseLine(String line) throws IOException + { + final String[] parsed = parser.parseLine(line); + return Utils.zipMapPartial( + Preconditions.checkNotNull(columns, "columns"), + Iterables.transform(Arrays.asList(parsed), multiValueFunction) + ); + } + + @Override + public int getNumHeaderLinesToSkip() + { + return skipHeaderRows; + } + + @Override + public boolean needsToProcessHeaderLine() + { + return findColumnsFromHeader; + } + + @Override + public void processHeaderLine(String line) throws IOException + { + if (!findColumnsFromHeader) { + throw new ISE("Don't call this if findColumnsFromHeader = false"); + } + columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); + if (columns.isEmpty()) { + throw new ISE("Empty columns"); + } + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java index 653fcc4c114b..9d7bae04ce58 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import java.io.ByteArrayOutputStream; @@ -48,6 +50,12 @@ public class CsvReaderTest Collections.emptyList() ); + @BeforeClass + public static void setup() + { + NullHandling.initializeForTests(); + } + @Test public void testWithoutHeaders() throws IOException { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java new file mode 100644 index 000000000000..0abfb8a94f68 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.Collections; + +public class TsvInputFormatTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new ObjectMapper(); + final TsvInputFormat format = new TsvInputFormat(Collections.singletonList("a"), "|", true, 10); + final byte[] bytes = mapper.writeValueAsBytes(format); + final TsvInputFormat fromJson = (TsvInputFormat) mapper.readValue(bytes, InputFormat.class); + Assert.assertEquals(format, fromJson); + } + + @Test + public void testComma() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Column[a\t] has a tab, it cannot"); + new TsvInputFormat(Collections.singletonList("a\t"), ",", false, 0); + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java new file mode 100644 index 000000000000..f3df33ebf8e8 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +public class TsvReaderTest +{ + private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))), + Collections.emptyList() + ); + + @BeforeClass + public static void setup() + { + NullHandling.initializeForTests(); + } + + @Test + public void testWithoutHeaders() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 0); + assertResult(source, format); + } + + @Test + public void testFindColumn() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "ts\tname\tscore", + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, true, 0); + assertResult(source, format); + } + + @Test + public void testSkipHeaders() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "this\tis\ta\trow\tto\tskip", + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 1); + assertResult(source, format); + } + + @Test + public void testFindColumnAndSkipHeaders() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "this\tis\ta\trow\tto\tskip", + "ts\tname\tscore", + "2019-01-01T00:00:10Z\tname_1\t5", + "2019-01-01T00:00:20Z\tname_2\t10", + "2019-01-01T00:00:30Z\tname_3\t15" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, true, 1); + assertResult(source, format); + } + + @Test + public void testMultiValues() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "ts\tname\tscore", + "2019-01-01T00:00:10Z\tname_1\t5|1", + "2019-01-01T00:00:20Z\tname_2\t10|2", + "2019-01-01T00:00:30Z\tname_3\t15|3" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), "|", true, 0); + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + int numResults = 0; + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + final InputRow row = iterator.next(); + Assert.assertEquals( + DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)), + row.getTimestamp() + ); + Assert.assertEquals( + StringUtils.format("name_%d", numResults + 1), + Iterables.getOnlyElement(row.getDimension("name")) + ); + Assert.assertEquals( + ImmutableList.of(Integer.toString((numResults + 1) * 5), Integer.toString(numResults + 1)), + row.getDimension("score") + ); + numResults++; + } + Assert.assertEquals(3, numResults); + } + } + + @Test + public void testQuotes() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "3\t\"Lets do some \"\"normal\"\" quotes\"\t2018-05-05T10:00:00Z", + "34\t\"Lets do some \"\"normal\"\", quotes with comma\"\t2018-05-06T10:00:00Z", + "343\t\"Lets try \\\"\"it\\\"\" with slash quotes\"\t2018-05-07T10:00:00Z", + "545\t\"Lets try \\\"\"it\\\"\", with slash quotes and comma\"\t2018-05-08T10:00:00Z", + "65\tHere I write \\n slash n\t2018-05-09T10:00:00Z" + ) + ); + final List expectedResults = ImmutableList.of( + new MapBasedInputRow( + DateTimes.of("2018-05-05T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "3", + "Comment", + "Lets do some \"normal\" quotes", + "Timestamp", + "2018-05-05T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-06T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "34", + "Comment", + "Lets do some \"normal\", quotes with comma", + "Timestamp", + "2018-05-06T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-07T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "343", + "Comment", + "Lets try \\\"it\\\" with slash quotes", + "Timestamp", + "2018-05-07T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-08T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of( + "Value", + "545", + "Comment", + "Lets try \\\"it\\\", with slash quotes and comma", + "Timestamp", + "2018-05-08T10:00:00Z" + ) + ), + new MapBasedInputRow( + DateTimes.of("2018-05-09T10:00:00Z"), + ImmutableList.of("Timestamp"), + ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("Value", "Comment", "Timestamp"), null, false, 0); + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("Timestamp", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))), + Collections.emptyList() + ), + source, + null + ); + + try (CloseableIterator iterator = reader.read()) { + final Iterator expectedRowIterator = expectedResults.iterator(); + while (iterator.hasNext()) { + Assert.assertTrue(expectedRowIterator.hasNext()); + Assert.assertEquals(expectedRowIterator.next(), iterator.next()); + } + } + } + + @Test + public void testRussianTextMess() throws IOException + { + final ByteEntity source = writeData( + ImmutableList.of( + "2019-01-01T00:00:10Z\tname_1\t\"Как говорится: \\\"\"всё течет\t всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\"" + ) + ); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0); + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + try (CloseableIterator iterator = reader.read()) { + Assert.assertTrue(iterator.hasNext()); + final InputRow row = iterator.next(); + Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp()); + Assert.assertEquals("name_1", Iterables.getOnlyElement(row.getDimension("name"))); + Assert.assertEquals( + "Как говорится: \\\"всё течет\t всё изменяется\\\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева", + Iterables.getOnlyElement(row.getDimension("Comment")) + ); + Assert.assertFalse(iterator.hasNext()); + } + } + + private ByteEntity writeData(List lines) throws IOException + { + final List byteLines = lines.stream() + .map(line -> StringUtils.toUtf8(line + "\n")) + .collect(Collectors.toList()); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream( + byteLines.stream().mapToInt(bytes -> bytes.length).sum() + ); + for (byte[] bytes : byteLines) { + outputStream.write(bytes); + } + return new ByteEntity(outputStream.toByteArray()); + } + + private void assertResult(ByteEntity source, TsvInputFormat format) throws IOException + { + final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); + int numResults = 0; + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + final InputRow row = iterator.next(); + Assert.assertEquals( + DateTimes.of(StringUtils.format("2019-01-01T00:00:%02dZ", (numResults + 1) * 10)), + row.getTimestamp() + ); + Assert.assertEquals( + StringUtils.format("name_%d", numResults + 1), + Iterables.getOnlyElement(row.getDimension("name")) + ); + Assert.assertEquals( + Integer.toString((numResults + 1) * 5), + Iterables.getOnlyElement(row.getDimension("score")) + ); + numResults++; + } + Assert.assertEquals(3, numResults); + } + } +} From 659028260a693a000b7c4040c880de2e3898d696 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Wed, 20 Nov 2019 19:51:25 -0800 Subject: [PATCH 2/8] refactor code --- .../druid/data/input/impl/CsvInputFormat.java | 107 +---------- .../druid/data/input/impl/CsvReader.java | 108 +---------- .../input/impl/SeperateValueInputFormat.java | 179 ++++++++++++++++++ .../data/input/impl/SeperateValueReader.java | 152 +++++++++++++++ .../druid/data/input/impl/TsvInputFormat.java | 107 +---------- .../druid/data/input/impl/TsvReader.java | 101 +--------- .../java/util/common/parsers/CSVParser.java | 4 +- .../data/input/impl/TsvInputFormatTest.java | 4 +- 8 files changed, 369 insertions(+), 393 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index cf2a301a69e3..d49fabb1bb37 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -22,28 +22,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.indexer.Checks; -import org.apache.druid.indexer.Property; import javax.annotation.Nullable; import java.io.File; -import java.util.Collections; import java.util.List; -import java.util.Objects; -public class CsvInputFormat implements InputFormat +public class CsvInputFormat extends SeperateValueInputFormat { - private final String listDelimiter; - private final List columns; - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - @JsonCreator public CsvInputFormat( @JsonProperty("columns") @Nullable List columns, @@ -53,28 +41,13 @@ public CsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - this.listDelimiter = listDelimiter; - this.columns = columns == null ? Collections.emptyList() : columns; - //noinspection ConstantConditions - this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("hasHeaderRow", hasHeaderRow), - new Property<>("findColumnsFromHeader", findColumnsFromHeader) - ) - ).getValue(); - this.skipHeaderRows = skipHeaderRows; + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, "comma"); + } - if (!this.columns.isEmpty()) { - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - } else { - Preconditions.checkArgument( - this.findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return super.createReader(inputRowSchema, source, temporaryDirectory, "comma"); } @VisibleForTesting @@ -87,70 +60,4 @@ public CsvInputFormat( { this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows); } - - @JsonProperty - public List getColumns() - { - return columns; - } - - @JsonProperty - public String getListDelimiter() - { - return listDelimiter; - } - - @JsonProperty - public boolean isFindColumnsFromHeader() - { - return findColumnsFromHeader; - } - - @JsonProperty - public int getSkipHeaderRows() - { - return skipHeaderRows; - } - - @Override - public boolean isSplittable() - { - return true; - } - - @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) - { - return new CsvReader( - inputRowSchema, - source, - temporaryDirectory, - listDelimiter, - columns, - findColumnsFromHeader, - skipHeaderRows - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CsvInputFormat format = (CsvInputFormat) o; - return findColumnsFromHeader == format.findColumnsFromHeader && - skipHeaderRows == format.skipHeaderRows && - Objects.equals(listDelimiter, format.listDelimiter) && - Objects.equals(columns, format.columns); - } - - @Override - public int hashCode() - { - return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows); - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java index 966ce6958bdb..d8f4027e4209 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java @@ -19,49 +19,15 @@ package org.apache.druid.data.input.impl; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.opencsv.RFC4180Parser; -import com.opencsv.RFC4180ParserBuilder; -import com.opencsv.enums.CSVReaderNullFieldIndicator; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.TextReader; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.common.parsers.ParserUtils; -import org.apache.druid.java.util.common.parsers.Parsers; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -public class CsvReader extends TextReader +public class CsvReader extends SeperateValueReader { - private final RFC4180Parser parser = CsvReader.createOpenCsvParser(); - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - private final Function multiValueFunction; - @Nullable - private List columns; - - public static RFC4180Parser createOpenCsvParser() - { - return NullHandling.replaceWithDefault() - ? new RFC4180Parser() - : new RFC4180ParserBuilder().withFieldAsNull( - CSVReaderNullFieldIndicator.EMPTY_SEPARATORS).build(); - } - CsvReader( InputRowSchema inputRowSchema, InputEntity source, @@ -72,69 +38,15 @@ public static RFC4180Parser createOpenCsvParser() int skipHeaderRows ) { - super(inputRowSchema, source, temporaryDirectory); - this.findColumnsFromHeader = findColumnsFromHeader; - this.skipHeaderRows = skipHeaderRows; - final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; - this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); - this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row - - if (this.columns != null) { - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - } else { - Preconditions.checkArgument( - findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } - } - - @Override - public List parseInputRows(String line) throws IOException, ParseException - { - final Map zipped = parseLine(line); - return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); - } - - @Override - public Map toMap(String intermediateRow) throws IOException - { - return parseLine(intermediateRow); - } - - private Map parseLine(String line) throws IOException - { - final String[] parsed = parser.parseLine(line); - return Utils.zipMapPartial( - Preconditions.checkNotNull(columns, "columns"), - Iterables.transform(Arrays.asList(parsed), multiValueFunction) + super( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows, + "comma" ); } - - @Override - public int getNumHeaderLinesToSkip() - { - return skipHeaderRows; - } - - @Override - public boolean needsToProcessHeaderLine() - { - return findColumnsFromHeader; - } - - @Override - public void processHeaderLine(String line) throws IOException - { - if (!findColumnsFromHeader) { - throw new ISE("Don't call this if findColumnsFromHeader = false"); - } - columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); - if (columns.isEmpty()) { - throw new ISE("Empty columns"); - } - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java new file mode 100644 index 000000000000..b246457b1e3c --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class SeperateValueInputFormat implements InputFormat +{ + private final String listDelimiter; + private final List columns; + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + private final String seperator; + + @JsonCreator + public SeperateValueInputFormat( + @JsonProperty("columns") @Nullable List columns, + @JsonProperty("listDelimiter") @Nullable String listDelimiter, + @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, + @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, + @JsonProperty("skipHeaderRows") int skipHeaderRows, + String seperator + ) + { + this.listDelimiter = listDelimiter; + this.columns = columns == null ? Collections.emptyList() : columns; + //noinspection ConstantConditions + this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("hasHeaderRow", hasHeaderRow), + new Property<>("findColumnsFromHeader", findColumnsFromHeader) + ) + ).getValue(); + this.skipHeaderRows = skipHeaderRows; + this.seperator = seperator; + + if (!this.columns.isEmpty()) { + for (String column : this.columns) { + Preconditions.checkArgument( + !column.contains("tab".equals(seperator) ? "\t" : ","), + "Column[%s] has a " + this.seperator + ", it cannot", + column + ); + } + } else { + Preconditions.checkArgument( + this.findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + + + @JsonProperty + public List getColumns() + { + return columns; + } + + @JsonProperty + public String getListDelimiter() + { + return listDelimiter; + } + + @JsonProperty + public boolean isFindColumnsFromHeader() + { + return findColumnsFromHeader; + } + + @JsonProperty + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new SeperateValueReader( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows, + seperator + ); + } + + public InputEntityReader createReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + String seperator + ) + { + return "tab".equals(seperator) ? new TsvReader( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows + ) : new CsvReader( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SeperateValueInputFormat format = (SeperateValueInputFormat) o; + return findColumnsFromHeader == format.findColumnsFromHeader && + skipHeaderRows == format.skipHeaderRows && + Objects.equals(listDelimiter, format.listDelimiter) && + Objects.equals(columns, format.columns); + } + + @Override + public int hashCode() + { + return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows); + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java new file mode 100644 index 000000000000..ef8cff8ca893 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.opencsv.RFC4180Parser; +import com.opencsv.RFC4180ParserBuilder; +import com.opencsv.enums.CSVReaderNullFieldIndicator; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.TextReader; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.collect.Utils; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.java.util.common.parsers.ParserUtils; +import org.apache.druid.java.util.common.parsers.Parsers; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class SeperateValueReader extends TextReader +{ + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + private final Function multiValueFunction; + @Nullable + private List columns; + private final String seperator; + private final RFC4180Parser parser; + + public static RFC4180Parser createOpenCsvParser(char seperator) + { + return NullHandling.replaceWithDefault() + ? new RFC4180ParserBuilder() + .withSeparator(seperator) + .build() + : new RFC4180ParserBuilder().withFieldAsNull( + CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) + .withSeparator(seperator) + .build(); + } + + SeperateValueReader( + InputRowSchema inputRowSchema, + InputEntity source, + File temporaryDirectory, + @Nullable String listDelimiter, + @Nullable List columns, + boolean findColumnsFromHeader, + int skipHeaderRows, + String seperator + ) + { + super(inputRowSchema, source, temporaryDirectory); + this.findColumnsFromHeader = findColumnsFromHeader; + this.skipHeaderRows = skipHeaderRows; + final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; + this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); + this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row + this.seperator = seperator; + this.parser = createOpenCsvParser("tab".equals(seperator) ? '\t' : ','); + + if (this.columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument( + !column.contains("tab".equals(seperator) ? "\t" : ","), + "Column[%s] has a " + this.seperator + ", it cannot", + column + ); + } + } else { + Preconditions.checkArgument( + findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + + @Override + public List parseInputRows(String line) throws IOException, ParseException + { + final Map zipped = parseLine(line); + return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); + } + + @Override + public Map toMap(String intermediateRow) throws IOException + { + return parseLine(intermediateRow); + } + + private Map parseLine(String line) throws IOException + { + final String[] parsed = parser.parseLine(line); + return Utils.zipMapPartial( + Preconditions.checkNotNull(columns, "columns"), + Iterables.transform(Arrays.asList(parsed), multiValueFunction) + ); + } + + @Override + public int getNumHeaderLinesToSkip() + { + return skipHeaderRows; + } + + @Override + public boolean needsToProcessHeaderLine() + { + return findColumnsFromHeader; + } + + @Override + public void processHeaderLine(String line) throws IOException + { + if (!findColumnsFromHeader) { + throw new ISE("Don't call this if findColumnsFromHeader = false"); + } + columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); + if (columns.isEmpty()) { + throw new ISE("Empty columns"); + } + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java index e6f84d2dd675..714ba14c8eb8 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -22,28 +22,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.indexer.Checks; -import org.apache.druid.indexer.Property; import javax.annotation.Nullable; import java.io.File; -import java.util.Collections; import java.util.List; -import java.util.Objects; -public class TsvInputFormat implements InputFormat +public class TsvInputFormat extends SeperateValueInputFormat { - private final String listDelimiter; - private final List columns; - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - @JsonCreator public TsvInputFormat( @JsonProperty("columns") @Nullable List columns, @@ -53,28 +41,13 @@ public TsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - this.listDelimiter = listDelimiter; - this.columns = columns == null ? Collections.emptyList() : columns; - //noinspection ConstantConditions - this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("hasHeaderRow", hasHeaderRow), - new Property<>("findColumnsFromHeader", findColumnsFromHeader) - ) - ).getValue(); - this.skipHeaderRows = skipHeaderRows; + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, "tab"); + } - if (!this.columns.isEmpty()) { - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains("\t"), "Column[%s] has a tab, it cannot", column); - } - } else { - Preconditions.checkArgument( - this.findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return super.createReader(inputRowSchema, source, temporaryDirectory, "tab"); } @VisibleForTesting @@ -87,70 +60,4 @@ public TsvInputFormat( { this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows); } - - @JsonProperty - public List getColumns() - { - return columns; - } - - @JsonProperty - public String getListDelimiter() - { - return listDelimiter; - } - - @JsonProperty - public boolean isFindColumnsFromHeader() - { - return findColumnsFromHeader; - } - - @JsonProperty - public int getSkipHeaderRows() - { - return skipHeaderRows; - } - - @Override - public boolean isSplittable() - { - return true; - } - - @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) - { - return new TsvReader( - inputRowSchema, - source, - temporaryDirectory, - listDelimiter, - columns, - findColumnsFromHeader, - skipHeaderRows - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TsvInputFormat format = (TsvInputFormat) o; - return findColumnsFromHeader == format.findColumnsFromHeader && - skipHeaderRows == format.skipHeaderRows && - Objects.equals(listDelimiter, format.listDelimiter) && - Objects.equals(columns, format.columns); - } - - @Override - public int hashCode() - { - return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows); - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java index ffdcd9319e24..07d20704857d 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java @@ -19,41 +19,15 @@ package org.apache.druid.data.input.impl; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.opencsv.RFC4180Parser; -import com.opencsv.RFC4180ParserBuilder; import org.apache.druid.data.input.InputEntity; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.TextReader; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.common.parsers.ParserUtils; -import org.apache.druid.java.util.common.parsers.Parsers; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; -public class TsvReader extends TextReader +public class TsvReader extends SeperateValueReader { - private final RFC4180Parser parser = new RFC4180ParserBuilder() - .withSeparator('\t') - .build(); - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - private final Function multiValueFunction; - @Nullable - private List columns; - TsvReader( InputRowSchema inputRowSchema, InputEntity source, @@ -64,70 +38,15 @@ public class TsvReader extends TextReader int skipHeaderRows ) { - super(inputRowSchema, source, temporaryDirectory); - this.findColumnsFromHeader = findColumnsFromHeader; - this.skipHeaderRows = skipHeaderRows; - final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; - this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); - this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row - - if (this.columns != null) { - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains("\t"), "Column[%s] has a tab, it cannot", column); - } - } else { - Preconditions.checkArgument( - findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } - } - - @Override - public List parseInputRows(String line) throws IOException, ParseException - { - final Map zipped = parseLine(line); - return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped)); - } - - @Override - public String toJson(String intermediateRow) throws IOException - { - final Map zipped = parseLine(intermediateRow); - return DEFAULT_JSON_WRITER.writeValueAsString(zipped); - } - - private Map parseLine(String line) throws IOException - { - final String[] parsed = parser.parseLine(line); - return Utils.zipMapPartial( - Preconditions.checkNotNull(columns, "columns"), - Iterables.transform(Arrays.asList(parsed), multiValueFunction) + super( + inputRowSchema, + source, + temporaryDirectory, + listDelimiter, + columns, + findColumnsFromHeader, + skipHeaderRows, + "tab" ); } - - @Override - public int getNumHeaderLinesToSkip() - { - return skipHeaderRows; - } - - @Override - public boolean needsToProcessHeaderLine() - { - return findColumnsFromHeader; - } - - @Override - public void processHeaderLine(String line) throws IOException - { - if (!findColumnsFromHeader) { - throw new ISE("Don't call this if findColumnsFromHeader = false"); - } - columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); - if (columns.isEmpty()) { - throw new ISE("Empty columns"); - } - } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java index 17eba147564b..a0a5d1289267 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java @@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.opencsv.RFC4180Parser; -import org.apache.druid.data.input.impl.CsvReader; +import org.apache.druid.data.input.impl.SeperateValueReader; import javax.annotation.Nullable; import java.io.IOException; @@ -30,7 +30,7 @@ public class CSVParser extends AbstractFlatTextFormatParser { - private final RFC4180Parser parser = CsvReader.createOpenCsvParser(); + private final RFC4180Parser parser = SeperateValueReader.createOpenCsvParser(','); public CSVParser( @Nullable final String listDelimiter, diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java index 0abfb8a94f68..c0cea99d31e0 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java @@ -45,10 +45,10 @@ public void testSerde() throws IOException } @Test - public void testComma() + public void testTab() { expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Column[a\t] has a tab, it cannot"); + expectedException.expectMessage("Column[a,] has a tab, it cannot"); new TsvInputFormat(Collections.singletonList("a\t"), ",", false, 0); } } From f025bff05635dab41c77130dbbb0bf05d15b6384 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Thu, 21 Nov 2019 10:24:57 -0800 Subject: [PATCH 3/8] fix grammar --- .../org/apache/druid/data/input/impl/CsvInputFormat.java | 2 +- .../java/org/apache/druid/data/input/impl/CsvReader.java | 2 +- ...alueInputFormat.java => SeparateValueInputFormat.java} | 8 ++++---- ...{SeperateValueReader.java => SeparateValueReader.java} | 4 ++-- .../org/apache/druid/data/input/impl/TsvInputFormat.java | 2 +- .../java/org/apache/druid/data/input/impl/TsvReader.java | 2 +- .../apache/druid/java/util/common/parsers/CSVParser.java | 4 ++-- .../apache/druid/data/input/impl/TsvInputFormatTest.java | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) rename core/src/main/java/org/apache/druid/data/input/impl/{SeperateValueInputFormat.java => SeparateValueInputFormat.java} (96%) rename core/src/main/java/org/apache/druid/data/input/impl/{SeperateValueReader.java => SeparateValueReader.java} (98%) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index d49fabb1bb37..ac447871968c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -30,7 +30,7 @@ import java.io.File; import java.util.List; -public class CsvInputFormat extends SeperateValueInputFormat +public class CsvInputFormat extends SeparateValueInputFormat { @JsonCreator public CsvInputFormat( diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java index d8f4027e4209..1c420f9046a2 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java @@ -26,7 +26,7 @@ import java.io.File; import java.util.List; -public class CsvReader extends SeperateValueReader +public class CsvReader extends SeparateValueReader { CsvReader( InputRowSchema inputRowSchema, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java similarity index 96% rename from core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java rename to core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java index b246457b1e3c..982ce676f6c2 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java @@ -36,7 +36,7 @@ import java.util.List; import java.util.Objects; -public class SeperateValueInputFormat implements InputFormat +public class SeparateValueInputFormat implements InputFormat { private final String listDelimiter; private final List columns; @@ -45,7 +45,7 @@ public class SeperateValueInputFormat implements InputFormat private final String seperator; @JsonCreator - public SeperateValueInputFormat( + public SeparateValueInputFormat( @JsonProperty("columns") @Nullable List columns, @JsonProperty("listDelimiter") @Nullable String listDelimiter, @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, @@ -117,7 +117,7 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new SeperateValueReader( + return new SeparateValueReader( inputRowSchema, source, temporaryDirectory, @@ -164,7 +164,7 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - SeperateValueInputFormat format = (SeperateValueInputFormat) o; + SeparateValueInputFormat format = (SeparateValueInputFormat) o; return findColumnsFromHeader == format.findColumnsFromHeader && skipHeaderRows == format.skipHeaderRows && Objects.equals(listDelimiter, format.listDelimiter) && diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java similarity index 98% rename from core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java rename to core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java index ef8cff8ca893..83e7971a87cf 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeperateValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java @@ -45,7 +45,7 @@ import java.util.List; import java.util.Map; -public class SeperateValueReader extends TextReader +public class SeparateValueReader extends TextReader { private final boolean findColumnsFromHeader; private final int skipHeaderRows; @@ -67,7 +67,7 @@ public static RFC4180Parser createOpenCsvParser(char seperator) .build(); } - SeperateValueReader( + SeparateValueReader( InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java index 714ba14c8eb8..f75593965979 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -30,7 +30,7 @@ import java.io.File; import java.util.List; -public class TsvInputFormat extends SeperateValueInputFormat +public class TsvInputFormat extends SeparateValueInputFormat { @JsonCreator public TsvInputFormat( diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java index 07d20704857d..6650057bb48f 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java @@ -26,7 +26,7 @@ import java.io.File; import java.util.List; -public class TsvReader extends SeperateValueReader +public class TsvReader extends SeparateValueReader { TsvReader( InputRowSchema inputRowSchema, diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java index a0a5d1289267..d026ad4eaed3 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java @@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.opencsv.RFC4180Parser; -import org.apache.druid.data.input.impl.SeperateValueReader; +import org.apache.druid.data.input.impl.SeparateValueReader; import javax.annotation.Nullable; import java.io.IOException; @@ -30,7 +30,7 @@ public class CSVParser extends AbstractFlatTextFormatParser { - private final RFC4180Parser parser = SeperateValueReader.createOpenCsvParser(','); + private final RFC4180Parser parser = SeparateValueReader.createOpenCsvParser(','); public CSVParser( @Nullable final String listDelimiter, diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java index c0cea99d31e0..f5afda470a7b 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java @@ -48,7 +48,7 @@ public void testSerde() throws IOException public void testTab() { expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Column[a,] has a tab, it cannot"); + expectedException.expectMessage("Column[a\t] has a tab, it cannot"); new TsvInputFormat(Collections.singletonList("a\t"), ",", false, 0); } } From 86ad997daec3531e2bb9b87801ae9ba5989c58ab Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Thu, 21 Nov 2019 11:27:16 -0800 Subject: [PATCH 4/8] use enum replace string literal --- .../druid/data/input/impl/CsvInputFormat.java | 4 +- .../druid/data/input/impl/CsvReader.java | 2 +- .../input/impl/SeparateValueInputFormat.java | 41 +++++++++++++++---- .../data/input/impl/SeparateValueReader.java | 18 ++++---- .../druid/data/input/impl/TsvInputFormat.java | 4 +- .../druid/data/input/impl/TsvReader.java | 2 +- 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index ac447871968c..0f421bf74750 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -41,13 +41,13 @@ public CsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, "comma"); + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, FlatTextFormat.CSV); } @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return super.createReader(inputRowSchema, source, temporaryDirectory, "comma"); + return super.createReader(inputRowSchema, source, temporaryDirectory, FlatTextFormat.CSV); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java index 1c420f9046a2..cb3cc77718bc 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java @@ -46,7 +46,7 @@ public class CsvReader extends SeparateValueReader columns, findColumnsFromHeader, skipHeaderRows, - "comma" + SeparateValueInputFormat.FlatTextFormat.CSV ); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java index 982ce676f6c2..915992cc2123 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java @@ -38,11 +38,36 @@ public class SeparateValueInputFormat implements InputFormat { + + public enum FlatTextFormat + { + CSV(","), + TSV("\t"); + + private final String defaultDelimiter; + + FlatTextFormat(String defaultDelimiter) + { + this.defaultDelimiter = defaultDelimiter; + } + + public String getDefaultDelimiter() + { + + return defaultDelimiter; + } + + public String getLiteral() + { + return ",".equals(defaultDelimiter) ? "comma" : "tab"; + } + } + private final String listDelimiter; private final List columns; private final boolean findColumnsFromHeader; private final int skipHeaderRows; - private final String seperator; + private final FlatTextFormat format; @JsonCreator public SeparateValueInputFormat( @@ -51,7 +76,7 @@ public SeparateValueInputFormat( @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, @JsonProperty("skipHeaderRows") int skipHeaderRows, - String seperator + FlatTextFormat format ) { this.listDelimiter = listDelimiter; @@ -64,13 +89,13 @@ public SeparateValueInputFormat( ) ).getValue(); this.skipHeaderRows = skipHeaderRows; - this.seperator = seperator; + this.format = format; if (!this.columns.isEmpty()) { for (String column : this.columns) { Preconditions.checkArgument( - !column.contains("tab".equals(seperator) ? "\t" : ","), - "Column[%s] has a " + this.seperator + ", it cannot", + !column.contains(format.getDefaultDelimiter()), + "Column[%s] has a " + format.getLiteral() + ", it cannot", column ); } @@ -125,7 +150,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity columns, findColumnsFromHeader, skipHeaderRows, - seperator + format ); } @@ -133,10 +158,10 @@ public InputEntityReader createReader( InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory, - String seperator + FlatTextFormat format ) { - return "tab".equals(seperator) ? new TsvReader( + return format == FlatTextFormat.TSV ? new TsvReader( inputRowSchema, source, temporaryDirectory, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java index 83e7971a87cf..38bbe0fc24c5 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java @@ -52,18 +52,18 @@ public class SeparateValueReader extends TextReader private final Function multiValueFunction; @Nullable private List columns; - private final String seperator; + private final SeparateValueInputFormat.FlatTextFormat format; private final RFC4180Parser parser; - public static RFC4180Parser createOpenCsvParser(char seperator) + public static RFC4180Parser createOpenCsvParser(char separator) { return NullHandling.replaceWithDefault() ? new RFC4180ParserBuilder() - .withSeparator(seperator) + .withSeparator(separator) .build() : new RFC4180ParserBuilder().withFieldAsNull( CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) - .withSeparator(seperator) + .withSeparator(separator) .build(); } @@ -75,7 +75,7 @@ public static RFC4180Parser createOpenCsvParser(char seperator) @Nullable List columns, boolean findColumnsFromHeader, int skipHeaderRows, - String seperator + SeparateValueInputFormat.FlatTextFormat format ) { super(inputRowSchema, source, temporaryDirectory); @@ -84,14 +84,14 @@ public static RFC4180Parser createOpenCsvParser(char seperator) final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row - this.seperator = seperator; - this.parser = createOpenCsvParser("tab".equals(seperator) ? '\t' : ','); + this.format = format; + this.parser = createOpenCsvParser(format.getDefaultDelimiter().charAt(0)); if (this.columns != null) { for (String column : this.columns) { Preconditions.checkArgument( - !column.contains("tab".equals(seperator) ? "\t" : ","), - "Column[%s] has a " + this.seperator + ", it cannot", + !column.contains(format.getDefaultDelimiter()), + "Column[%s] has a " + format.getLiteral() + ", it cannot", column ); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java index f75593965979..0a92d68c1fd7 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -41,13 +41,13 @@ public TsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, "tab"); + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, FlatTextFormat.TSV); } @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return super.createReader(inputRowSchema, source, temporaryDirectory, "tab"); + return super.createReader(inputRowSchema, source, temporaryDirectory, FlatTextFormat.TSV); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java index 6650057bb48f..ea5337cda4de 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java @@ -46,7 +46,7 @@ public class TsvReader extends SeparateValueReader columns, findColumnsFromHeader, skipHeaderRows, - "tab" + SeparateValueInputFormat.FlatTextFormat.TSV ); } } From 4c69223cb59200c73d82d3027341a646e8a36770 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Fri, 22 Nov 2019 11:22:53 -0800 Subject: [PATCH 5/8] code refactor --- .../druid/data/input/impl/CsvInputFormat.java | 10 ---- .../input/impl/SeparateValueInputFormat.java | 47 +++++++------------ .../data/input/impl/SeparateValueReader.java | 4 +- .../druid/data/input/impl/TsvInputFormat.java | 10 ---- 4 files changed, 19 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 0f421bf74750..e0b488739240 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -22,12 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import org.apache.druid.data.input.InputEntity; -import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputRowSchema; import javax.annotation.Nullable; -import java.io.File; import java.util.List; public class CsvInputFormat extends SeparateValueInputFormat @@ -44,12 +40,6 @@ public CsvInputFormat( super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, FlatTextFormat.CSV); } - @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) - { - return super.createReader(inputRowSchema, source, temporaryDirectory, FlatTextFormat.CSV); - } - @VisibleForTesting public CsvInputFormat( List columns, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java index 915992cc2123..e104e1bcbd17 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java @@ -41,25 +41,31 @@ public class SeparateValueInputFormat implements InputFormat public enum FlatTextFormat { - CSV(","), - TSV("\t"); + CSV(',', "comma"), + TSV('\t', "tab"); - private final String defaultDelimiter; + private final char delimiter; + private final String literal; - FlatTextFormat(String defaultDelimiter) + FlatTextFormat(char delimiter, String literal) { - this.defaultDelimiter = defaultDelimiter; + this.delimiter = delimiter; + this.literal = literal; } - public String getDefaultDelimiter() + public String getDelimiterAsString() { + return String.valueOf(delimiter); + } - return defaultDelimiter; + public char getDelimiter() + { + return delimiter; } public String getLiteral() { - return ",".equals(defaultDelimiter) ? "comma" : "tab"; + return literal; } } @@ -94,7 +100,7 @@ public SeparateValueInputFormat( if (!this.columns.isEmpty()) { for (String column : this.columns) { Preconditions.checkArgument( - !column.contains(format.getDefaultDelimiter()), + !column.contains(format.getDelimiterAsString()), "Column[%s] has a " + format.getLiteral() + ", it cannot", column ); @@ -142,26 +148,7 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new SeparateValueReader( - inputRowSchema, - source, - temporaryDirectory, - listDelimiter, - columns, - findColumnsFromHeader, - skipHeaderRows, - format - ); - } - - public InputEntityReader createReader( - InputRowSchema inputRowSchema, - InputEntity source, - File temporaryDirectory, - FlatTextFormat format - ) - { - return format == FlatTextFormat.TSV ? new TsvReader( + return this.format == FlatTextFormat.TSV ? new TsvReader( inputRowSchema, source, temporaryDirectory, @@ -179,7 +166,7 @@ public InputEntityReader createReader( skipHeaderRows ); } - + @Override public boolean equals(Object o) { diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java index 38bbe0fc24c5..58c3225a7bb5 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java @@ -85,12 +85,12 @@ public static RFC4180Parser createOpenCsvParser(char separator) this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row this.format = format; - this.parser = createOpenCsvParser(format.getDefaultDelimiter().charAt(0)); + this.parser = createOpenCsvParser(format.getDelimiter()); if (this.columns != null) { for (String column : this.columns) { Preconditions.checkArgument( - !column.contains(format.getDefaultDelimiter()), + !column.contains(format.getDelimiterAsString()), "Column[%s] has a " + format.getLiteral() + ", it cannot", column ); diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java index 0a92d68c1fd7..0923862b7565 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -22,12 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; -import org.apache.druid.data.input.InputEntity; -import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputRowSchema; import javax.annotation.Nullable; -import java.io.File; import java.util.List; public class TsvInputFormat extends SeparateValueInputFormat @@ -44,12 +40,6 @@ public TsvInputFormat( super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, FlatTextFormat.TSV); } - @Override - public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) - { - return super.createReader(inputRowSchema, source, temporaryDirectory, FlatTextFormat.TSV); - } - @VisibleForTesting public TsvInputFormat( List columns, From c3c5556281a150ebeaf2d43306bee5b771331999 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Fri, 22 Nov 2019 11:57:26 -0800 Subject: [PATCH 6/8] code refactor --- .../druid/data/input/impl/CsvInputFormat.java | 2 +- .../druid/data/input/impl/CsvReader.java | 2 +- .../input/impl/SeparateValueInputFormat.java | 38 ++++++++++--------- .../data/input/impl/SeparateValueReader.java | 4 +- .../druid/data/input/impl/TsvInputFormat.java | 2 +- .../druid/data/input/impl/TsvReader.java | 2 +- 6 files changed, 27 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index e0b488739240..e3a69fb5b0a6 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -37,7 +37,7 @@ public CsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, FlatTextFormat.CSV); + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java index cb3cc77718bc..92fe72d8248a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java @@ -46,7 +46,7 @@ public class CsvReader extends SeparateValueReader columns, findColumnsFromHeader, skipHeaderRows, - SeparateValueInputFormat.FlatTextFormat.CSV + SeparateValueInputFormat.Format.CSV ); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java index e104e1bcbd17..cd53a4d04d81 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java @@ -19,7 +19,6 @@ package org.apache.druid.data.input.impl; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -36,10 +35,15 @@ import java.util.List; import java.util.Objects; +/** + * SeparateValueInputFormat abstracts the (Comma/Tab) Separate Value format of input data. + * It implements the common logic between {@link CsvInputFormat} and {@link TsvInputFormat} + * Should never be instantiated + */ public class SeparateValueInputFormat implements InputFormat { - public enum FlatTextFormat + public enum Format { CSV(',', "comma"), TSV('\t', "tab"); @@ -47,7 +51,7 @@ public enum FlatTextFormat private final char delimiter; private final String literal; - FlatTextFormat(char delimiter, String literal) + Format(char delimiter, String literal) { this.delimiter = delimiter; this.literal = literal; @@ -73,16 +77,15 @@ public String getLiteral() private final List columns; private final boolean findColumnsFromHeader; private final int skipHeaderRows; - private final FlatTextFormat format; - - @JsonCreator - public SeparateValueInputFormat( - @JsonProperty("columns") @Nullable List columns, - @JsonProperty("listDelimiter") @Nullable String listDelimiter, - @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, - @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, - @JsonProperty("skipHeaderRows") int skipHeaderRows, - FlatTextFormat format + private final Format format; + + protected SeparateValueInputFormat( + @Nullable List columns, + @Nullable String listDelimiter, + @Nullable Boolean hasHeaderRow, + @Nullable Boolean findColumnsFromHeader, + int skipHeaderRows, + Format format ) { this.listDelimiter = listDelimiter; @@ -148,7 +151,7 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return this.format == FlatTextFormat.TSV ? new TsvReader( + return this.format == Format.TSV ? new TsvReader( inputRowSchema, source, temporaryDirectory, @@ -166,7 +169,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity skipHeaderRows ); } - + @Override public boolean equals(Object o) { @@ -180,12 +183,13 @@ public boolean equals(Object o) return findColumnsFromHeader == format.findColumnsFromHeader && skipHeaderRows == format.skipHeaderRows && Objects.equals(listDelimiter, format.listDelimiter) && - Objects.equals(columns, format.columns); + Objects.equals(columns, format.columns) && + Objects.equals(this.format, format.format); } @Override public int hashCode() { - return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows); + return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, format); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java index 58c3225a7bb5..ab95ed12d839 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java @@ -52,7 +52,7 @@ public class SeparateValueReader extends TextReader private final Function multiValueFunction; @Nullable private List columns; - private final SeparateValueInputFormat.FlatTextFormat format; + private final SeparateValueInputFormat.Format format; private final RFC4180Parser parser; public static RFC4180Parser createOpenCsvParser(char separator) @@ -75,7 +75,7 @@ public static RFC4180Parser createOpenCsvParser(char separator) @Nullable List columns, boolean findColumnsFromHeader, int skipHeaderRows, - SeparateValueInputFormat.FlatTextFormat format + SeparateValueInputFormat.Format format ) { super(inputRowSchema, source, temporaryDirectory); diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java index 0923862b7565..c75c5712b07a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -37,7 +37,7 @@ public TsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, FlatTextFormat.TSV); + super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.TSV); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java index ea5337cda4de..961f61c634ea 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvReader.java @@ -46,7 +46,7 @@ public class TsvReader extends SeparateValueReader columns, findColumnsFromHeader, skipHeaderRows, - SeparateValueInputFormat.FlatTextFormat.TSV + SeparateValueInputFormat.Format.TSV ); } } From 83a24782970ef578a6a1f966b8d0a57a7ce249b6 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Fri, 22 Nov 2019 13:25:48 -0800 Subject: [PATCH 7/8] mark abstract for base class meant not to be instantiated --- .../druid/data/input/impl/SeparateValueInputFormat.java | 2 +- .../apache/druid/data/input/impl/SeparateValueReader.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java index cd53a4d04d81..93a13e8b8038 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueInputFormat.java @@ -40,7 +40,7 @@ * It implements the common logic between {@link CsvInputFormat} and {@link TsvInputFormat} * Should never be instantiated */ -public class SeparateValueInputFormat implements InputFormat +public abstract class SeparateValueInputFormat implements InputFormat { public enum Format diff --git a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java index ab95ed12d839..9ed553c1d784 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/SeparateValueReader.java @@ -45,7 +45,12 @@ import java.util.List; import java.util.Map; -public class SeparateValueReader extends TextReader +/** + * SeparateValueReader abstracts the reader for (Comma/Tab) Separate Value format input data. + * It implements the common logic between {@link CsvReader} and {@link TsvReader} + * Should never be instantiated + */ +public abstract class SeparateValueReader extends TextReader { private final boolean findColumnsFromHeader; private final int skipHeaderRows; From cd2666a66d17a579e5f1436c60a0f79769489152 Mon Sep 17 00:00:00 2001 From: Junjie Gu Date: Fri, 22 Nov 2019 14:31:53 -0800 Subject: [PATCH 8/8] remove constructor for test --- .../druid/data/input/impl/CSVParseSpec.java | 2 +- .../druid/data/input/impl/CsvInputFormat.java | 12 ----------- .../druid/data/input/impl/TsvInputFormat.java | 12 ----------- .../data/input/impl/CsvInputFormatTest.java | 4 ++-- .../druid/data/input/impl/CsvReaderTest.java | 20 ++++++++++++------- .../impl/InputEntityIteratingReaderTest.java | 1 + .../impl/TimedShutoffInputSourceTest.java | 2 +- .../data/input/impl/TsvInputFormatTest.java | 4 ++-- .../druid/data/input/impl/TsvReaderTest.java | 20 ++++++++++++------- .../ParallelIndexSupervisorTaskSerdeTest.java | 2 +- .../sampler/CsvInputSourceSamplerTest.java | 2 +- .../sampler/InputSourceSamplerTest.java | 2 +- .../RecordSupplierInputSourceTest.java | 2 +- 13 files changed, 37 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java index 5340a4274517..51ffd3831a9b 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java @@ -100,7 +100,7 @@ public Parser makeParser() @Override public InputFormat toInputFormat() { - return new CsvInputFormat(columns, listDelimiter, hasHeaderRow, skipHeaderRows); + return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index e3a69fb5b0a6..28aa5c680de8 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nullable; import java.util.List; @@ -39,15 +38,4 @@ public CsvInputFormat( { super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV); } - - @VisibleForTesting - public CsvInputFormat( - List columns, - String listDelimiter, - boolean findColumnsFromHeader, - int skipHeaderRows - ) - { - this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows); - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java index c75c5712b07a..8bec9536989e 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/TsvInputFormat.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nullable; import java.util.List; @@ -39,15 +38,4 @@ public TsvInputFormat( { super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.TSV); } - - @VisibleForTesting - public TsvInputFormat( - List columns, - String listDelimiter, - boolean findColumnsFromHeader, - int skipHeaderRows - ) - { - this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows); - } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java index 8d2d688e9868..b09ba86f09e9 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java @@ -38,7 +38,7 @@ public class CsvInputFormatTest public void testSerde() throws IOException { final ObjectMapper mapper = new ObjectMapper(); - final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", true, 10); + final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), "|", null, true, 10); final byte[] bytes = mapper.writeValueAsBytes(format); final CsvInputFormat fromJson = (CsvInputFormat) mapper.readValue(bytes, InputFormat.class); Assert.assertEquals(format, fromJson); @@ -49,6 +49,6 @@ public void testComma() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Column[a,] has a comma, it cannot"); - new CsvInputFormat(Collections.singletonList("a,"), ",", false, 0); + new CsvInputFormat(Collections.singletonList("a,"), ",", null, false, 0); } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java index 9d7bae04ce58..ec942379f3b2 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvReaderTest.java @@ -66,7 +66,7 @@ public void testWithoutHeaders() throws IOException "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0); assertResult(source, format); } @@ -81,7 +81,7 @@ public void testFindColumn() throws IOException "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, null, true, 0); assertResult(source, format); } @@ -96,7 +96,7 @@ public void testSkipHeaders() throws IOException "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 1); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1); assertResult(source, format); } @@ -112,7 +112,7 @@ public void testFindColumnAndSkipHeaders() throws IOException "2019-01-01T00:00:30Z,name_3,15" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, true, 1); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), null, null, true, 1); assertResult(source, format); } @@ -127,7 +127,7 @@ public void testMultiValues() throws IOException "2019-01-01T00:00:30Z,name_3,15|3" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", null, true, 0); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); int numResults = 0; try (CloseableIterator iterator = reader.read()) { @@ -218,7 +218,13 @@ public void testQuotes() throws IOException ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("Value", "Comment", "Timestamp"), null, false, 0); + final CsvInputFormat format = new CsvInputFormat( + ImmutableList.of("Value", "Comment", "Timestamp"), + null, + null, + false, + 0 + ); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("Timestamp", "auto", null), @@ -246,7 +252,7 @@ public void testRussianTextMess() throws IOException "2019-01-01T00:00:10Z,name_1,\"Как говорится: \\\"\"всё течет, всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\"" ) ); - final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0); + final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); try (CloseableIterator iterator = reader.read()) { Assert.assertTrue(iterator.hasNext()); diff --git a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java index 6d55f14d103c..7bde81c3cb35 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java @@ -69,6 +69,7 @@ public void test() throws IOException new CsvInputFormat( ImmutableList.of("time", "name", "score"), null, + null, false, 0 ), diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java index 4f04e880d9b1..2b4b2fc860a6 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/TimedShutoffInputSourceTest.java @@ -43,7 +43,7 @@ public void testTimeoutShutoff() throws IOException, InterruptedException new InlineInputSource("this,is,test\nthis,data,has\n3,rows,\n"), DateTimes.nowUtc().plusMillis(timeoutMs) ); - final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, false, 0); + final InputFormat inputFormat = new CsvInputFormat(ImmutableList.of("col1", "col2", "col3"), null, null, false, 0); final InputSourceReader reader = inputSource.reader( new InputRowSchema(new TimestampSpec(null, null, null), new DimensionsSpec(null), Collections.emptyList()), inputFormat, diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java index f5afda470a7b..b160e4bc223c 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvInputFormatTest.java @@ -38,7 +38,7 @@ public class TsvInputFormatTest public void testSerde() throws IOException { final ObjectMapper mapper = new ObjectMapper(); - final TsvInputFormat format = new TsvInputFormat(Collections.singletonList("a"), "|", true, 10); + final TsvInputFormat format = new TsvInputFormat(Collections.singletonList("a"), "|", null, true, 10); final byte[] bytes = mapper.writeValueAsBytes(format); final TsvInputFormat fromJson = (TsvInputFormat) mapper.readValue(bytes, InputFormat.class); Assert.assertEquals(format, fromJson); @@ -49,6 +49,6 @@ public void testTab() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Column[a\t] has a tab, it cannot"); - new TsvInputFormat(Collections.singletonList("a\t"), ",", false, 0); + new TsvInputFormat(Collections.singletonList("a\t"), ",", null, false, 0); } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java index f3df33ebf8e8..8f42e40ffae8 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/TsvReaderTest.java @@ -66,7 +66,7 @@ public void testWithoutHeaders() throws IOException "2019-01-01T00:00:30Z\tname_3\t15" ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 0); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 0); assertResult(source, format); } @@ -81,7 +81,7 @@ public void testFindColumn() throws IOException "2019-01-01T00:00:30Z\tname_3\t15" ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, true, 0); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 0); assertResult(source, format); } @@ -96,7 +96,7 @@ public void testSkipHeaders() throws IOException "2019-01-01T00:00:30Z\tname_3\t15" ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, false, 1); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "score"), null, null, false, 1); assertResult(source, format); } @@ -112,7 +112,7 @@ public void testFindColumnAndSkipHeaders() throws IOException "2019-01-01T00:00:30Z\tname_3\t15" ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, true, 1); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), null, null, true, 1); assertResult(source, format); } @@ -127,7 +127,7 @@ public void testMultiValues() throws IOException "2019-01-01T00:00:30Z\tname_3\t15|3" ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), "|", true, 0); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of(), "|", null, true, 0); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); int numResults = 0; try (CloseableIterator iterator = reader.read()) { @@ -218,7 +218,13 @@ public void testQuotes() throws IOException ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("Value", "Comment", "Timestamp"), null, false, 0); + final TsvInputFormat format = new TsvInputFormat( + ImmutableList.of("Value", "Comment", "Timestamp"), + null, + null, + false, + 0 + ); final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("Timestamp", "auto", null), @@ -246,7 +252,7 @@ public void testRussianTextMess() throws IOException "2019-01-01T00:00:10Z\tname_1\t\"Как говорится: \\\"\"всё течет\t всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\"" ) ); - final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0); + final TsvInputFormat format = new TsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, null, false, 0); final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null); try (CloseableIterator iterator = reader.read()) { Assert.assertTrue(iterator.hasNext()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index 97c6954e9247..b087e25d823b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -217,7 +217,7 @@ private static class ParallelIndexIngestionSpecBuilder private final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( null, new LocalInputSource(new File("tmp"), "test_*"), - new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, false, 0), + new CsvInputFormat(Arrays.asList("ts", "dim", "val"), null, null, false, 0), false ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java index 753ad10326a1..36ae63026c53 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java @@ -58,7 +58,7 @@ public void testCSVColumnAllNull() "Michael,Jackson,,Male" ); final InputSource inputSource = new InlineInputSource(String.join("\n", strCsvRows)); - final InputFormat inputFormat = new CsvInputFormat(null, null, true, 0); + final InputFormat inputFormat = new CsvInputFormat(null, null, null, true, 0); final InputSourceSampler inputSourceSampler = new InputSourceSampler(); final SamplerResponse response = inputSourceSampler.sample( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index cbfadfad75f9..ee6b9df9ab8f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1090,7 +1090,7 @@ private InputFormat createInputFormat() case STR_JSON: return new JsonInputFormat(null, null); case STR_CSV: - return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, false, 0); + return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0); default: throw new IAE("Unknown parser type: %s", parserType); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index fbd6a91bb872..e93eb10ebf20 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -71,7 +71,7 @@ public void testRead() throws IOException final List colNames = IntStream.range(0, NUM_COLS) .mapToObj(i -> StringUtils.format("col_%d", i)) .collect(Collectors.toList()); - final InputFormat inputFormat = new CsvInputFormat(colNames, null, false, 0); + final InputFormat inputFormat = new CsvInputFormat(colNames, null, null, false, 0); final InputSourceReader reader = inputSource.reader( new InputRowSchema( new TimestampSpec("col_0", "auto", null),