From e2304f26f00cd2a2695e735b1245555174396524 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 15 Jan 2020 13:39:57 -0800 Subject: [PATCH 1/6] working --- .../data/input/impl/DelimitedInputFormat.java | 155 +++------------ .../data/input/impl/DelimitedValueReader.java | 9 +- .../data/input/impl/FlatTextInputFormat.java | 180 ++++++++++++++++++ 3 files changed, 216 insertions(+), 128 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java index 7d6bc012f6ee..29446a6a92b4 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.impl; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -38,53 +39,12 @@ /** * InputFormat for customized Delimitor Separate Value format of input data(default is TSV). */ -public class DelimitedInputFormat implements InputFormat +public class DelimitedInputFormat extends FlatTextInputFormat { - - public enum Format - { - CSV(',', "comma"), - TSV('\t', "tab"), - CustomizeSV('|', ""); - - private char delimiter; - private String literal; - - Format(char delimiter, String literal) - { - this.delimiter = delimiter; - this.literal = literal; - } - - public String getDelimiterAsString() - { - return String.valueOf(delimiter); - } - - private void setDelimiter(String delimiter, String literal) - { - this.delimiter = (delimiter != null && delimiter.length() > 0) ? delimiter.charAt(0) : '\t'; - this.literal = literal != null ? literal : "customize separator: " + delimiter; - } - - public char getDelimiter() - { - return delimiter; - } - - public String getLiteral() - { - return literal; - } - } - - private final String listDelimiter; - private final List columns; - private final boolean findColumnsFromHeader; - private final int skipHeaderRows; - private final Format format; + private static final String DEFAULT_DELIMITER = "\t"; private final String delimiter; + @JsonCreator public DelimitedInputFormat( @JsonProperty("columns") @Nullable List columns, @JsonProperty("listDelimiter") @Nullable String listDelimiter, @@ -94,79 +54,49 @@ public DelimitedInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - this.listDelimiter = listDelimiter; - this.columns = columns == null ? Collections.emptyList() : columns; - //noinspection ConstantConditions - this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("hasHeaderRow", hasHeaderRow), - new Property<>("findColumnsFromHeader", findColumnsFromHeader) - ) - ).getValue(); - this.skipHeaderRows = skipHeaderRows; - this.delimiter = delimiter == null ? "\t" : delimiter; - this.format = getFormat(this.delimiter); - Preconditions.checkArgument( - this.delimiter.length() == 1, - "The delimiter should be a single character" - ); - Preconditions.checkArgument( - !this.delimiter.equals(listDelimiter), - "Cannot have same delimiter and list delimiter of [%s]", - this.delimiter + super( + columns, + listDelimiter, + delimiter == null ? DEFAULT_DELIMITER : delimiter, + hasHeaderRow, + findColumnsFromHeader, + skipHeaderRows ); - if (!this.columns.isEmpty()) { - for (String column : this.columns) { - Preconditions.checkArgument( - !column.contains(format.getDelimiterAsString()), - "Column[%s] has a " + format.getLiteral() + ", it cannot", - column - ); - } - } else { - Preconditions.checkArgument( - this.findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } + this.delimiter = delimiter; } - private static Format getFormat(String delimiter) - { - if (",".equals(delimiter)) { - return Format.CSV; - } else if ("\t".equals(delimiter)) { - return Format.TSV; - } else { - Format.CustomizeSV.setDelimiter(delimiter, null); - return Format.CustomizeSV; - } - } - - + @Override @JsonProperty public List getColumns() { - return columns; + return super.getColumns(); } + @Override @JsonProperty public String getListDelimiter() { - return listDelimiter; + return super.getListDelimiter(); + } + + @JsonProperty("delimiter") + public String getDelimiterString() + { + return delimiter; } + @Override @JsonProperty public boolean isFindColumnsFromHeader() { - return findColumnsFromHeader; + return super.isFindColumnsFromHeader(); } + @Override @JsonProperty public int getSkipHeaderRows() { - return skipHeaderRows; + return super.getSkipHeaderRows(); } @Override @@ -182,34 +112,11 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity inputRowSchema, source, temporaryDirectory, - listDelimiter, - columns, - findColumnsFromHeader, - skipHeaderRows, - this.format + getListDelimiter(), + getColumns(), + isFindColumnsFromHeader(), + getSkipHeaderRows(), + getDelimiter() ); } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - DelimitedInputFormat format = (DelimitedInputFormat) o; - return findColumnsFromHeader == format.findColumnsFromHeader && - skipHeaderRows == format.skipHeaderRows && - Objects.equals(listDelimiter, format.listDelimiter) && - Objects.equals(columns, format.columns) && - Objects.equals(this.format, format.format); - } - - @Override - public int hashCode() - { - return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, format); - } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java index cae21b148ef5..6d68f55c8ba0 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java @@ -31,6 +31,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.TextReader; +import org.apache.druid.data.input.impl.FlatTextInputFormat.Delimiter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.parsers.ParseException; @@ -77,7 +78,7 @@ public static RFC4180Parser createOpenCsvParser(char separator) @Nullable List columns, boolean findColumnsFromHeader, int skipHeaderRows, - DelimitedInputFormat.Format format + Delimiter delimiter ) { super(inputRowSchema, source, temporaryDirectory); @@ -86,13 +87,13 @@ public static RFC4180Parser createOpenCsvParser(char separator) final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row - this.parser = createOpenCsvParser(format.getDelimiter()); + this.parser = createOpenCsvParser(delimiter.getDelimiter()); if (this.columns != null) { for (String column : this.columns) { Preconditions.checkArgument( - !column.contains(format.getDelimiterAsString()), - "Column[%s] has a " + format.getLiteral() + ", it cannot", + !column.contains(delimiter.getDelimiterAsString()), + "Column[%s] has a " + delimiter.getLiteral() + ", it cannot", column ); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java new file mode 100644 index 000000000000..ee6bea05f399 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexer.Checks; +import org.apache.druid.indexer.Property; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public abstract class FlatTextInputFormat implements InputFormat +{ + static class Delimiter + { + private final char delimiter; + private final String literal; + + Delimiter(char delimiter, String literal) + { + this.delimiter = delimiter; + this.literal = literal; + } + +// private void setDelimiter(String delimiter, String literal) +// { +// this.delimiter = (delimiter != null && delimiter.length() > 0) ? delimiter.charAt(0) : '\t'; +// this.literal = literal != null ? literal : "customize separator: " + delimiter; +// } + + public String getDelimiterAsString() + { + return String.valueOf(delimiter); + } + + public char getDelimiter() + { + return delimiter; + } + + public String getLiteral() + { + return literal; + } + } + + private final String listDelimiter; + private final List columns; + private final boolean findColumnsFromHeader; + private final int skipHeaderRows; + private final Delimiter delimiter; + + FlatTextInputFormat( + @Nullable List columns, + @Nullable String listDelimiter, + String stringDelimiter, + @Nullable Boolean hasHeaderRow, + @Nullable Boolean findColumnsFromHeader, + int skipHeaderRows + ) + { + this.listDelimiter = listDelimiter; + this.columns = columns == null ? Collections.emptyList() : columns; + //noinspection ConstantConditions + this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("hasHeaderRow", hasHeaderRow), + new Property<>("findColumnsFromHeader", findColumnsFromHeader) + ) + ).getValue(); + this.skipHeaderRows = skipHeaderRows; +// this.stringDelimiter = delimiter == null ? "\t" : delimiter; + this.delimiter = new Delimiter(Preconditions.checkNotNull(stringDelimiter, "stringDelimiter"), ) + Preconditions.checkArgument( + this.stringDelimiter.length() == 1, + "The delimiter should be a single character" + ); + Preconditions.checkArgument( + !this.stringDelimiter.equals(listDelimiter), + "Cannot have same delimiter and list delimiter of [%s]", + this.delimiter + ); + if (!this.columns.isEmpty()) { + for (String column : this.columns) { + Preconditions.checkArgument( + !column.contains(this.delimiter.getDelimiterAsString()), + "Column[%s] has a " + this.delimiter.getLiteral() + ", it cannot", + column + ); + } + } else { + Preconditions.checkArgument( + this.findColumnsFromHeader, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } + } + +// private static Delimiter getFormat(String delimiter) +// { +// if (",".equals(delimiter)) { +// return Delimiter.COMMA; +// } else if ("\t".equals(delimiter)) { +// return Delimiter.TAB; +// } else { +// Delimiter.CUSTOM.setDelimiter(delimiter, null); +// return Delimiter.CUSTOM; +// } +// } + + public Delimiter getDelimiter() + { + return delimiter; + } + + public List getColumns() + { + return columns; + } + + public String getListDelimiter() + { + return listDelimiter; + } + + public boolean isFindColumnsFromHeader() + { + return findColumnsFromHeader; + } + + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlatTextInputFormat that = (FlatTextInputFormat) o; + return findColumnsFromHeader == that.findColumnsFromHeader && + skipHeaderRows == that.skipHeaderRows && + Objects.equals(listDelimiter, that.listDelimiter) && + Objects.equals(columns, that.columns) && + Objects.equals(delimiter, that.delimiter); + } + + @Override + public int hashCode() + { + return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, delimiter); + } +} From d3f64e5a4aaa9124cb04ea8436bdc44e8d3c380d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Jan 2020 14:45:07 -0800 Subject: [PATCH 2/6] - support multi-char delimiter for tsv - respect "delimiter" property for tsv --- .../apache/druid/data/input/TextReader.java | 7 +- .../druid/data/input/impl/CSVParser.java | 56 ++++++++++++++ .../druid/data/input/impl/CsvInputFormat.java | 34 ++++++++- .../data/input/impl/DelimitedInputFormat.java | 73 ++++++++---------- .../data/input/impl/DelimitedValueReader.java | 49 +++--------- .../data/input/impl/FlatTextInputFormat.java | 76 ++++--------------- .../data/input/impl/JsonInputFormat.java | 2 +- .../druid/data/input/impl/JsonReader.java | 4 +- .../data/input/impl/RegexInputFormat.java | 2 +- .../druid/data/input/impl/RegexReader.java | 4 +- .../java/util/common/parsers/CSVParser.java | 15 ++-- .../util/common/parsers/DelimitedParser.java | 37 ++------- .../input/impl/DelimitedInputFormatTest.java | 2 +- 13 files changed, 165 insertions(+), 196 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java diff --git a/core/src/main/java/org/apache/druid/data/input/TextReader.java b/core/src/main/java/org/apache/druid/data/input/TextReader.java index 33151bd0316b..7214d322399f 100644 --- a/core/src/main/java/org/apache/druid/data/input/TextReader.java +++ b/core/src/main/java/org/apache/druid/data/input/TextReader.java @@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParserUtils; -import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; @@ -38,14 +37,12 @@ public abstract class TextReader extends IntermediateRowParsingReader { private final InputRowSchema inputRowSchema; - final InputEntity source; - final File temporaryDirectory; + private final InputEntity source; - public TextReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + public TextReader(InputRowSchema inputRowSchema, InputEntity source) { this.inputRowSchema = inputRowSchema; this.source = source; - this.temporaryDirectory = temporaryDirectory; } public InputRowSchema getInputRowSchema() diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java new file mode 100644 index 000000000000..0e98a3720a98 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.opencsv.RFC4180Parser; +import com.opencsv.RFC4180ParserBuilder; +import com.opencsv.enums.CSVReaderNullFieldIndicator; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.DelimitedValueReader.DelimitedValueParser; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public class CSVParser implements DelimitedValueParser +{ + private static final char SEPERATOR = ','; + private final RFC4180Parser delegate; + + public static RFC4180Parser createOpenCsvParser() + { + return NullHandling.replaceWithDefault() + ? new RFC4180ParserBuilder().withSeparator(SEPERATOR).build() + : new RFC4180ParserBuilder().withFieldAsNull(CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) + .withSeparator(SEPERATOR) + .build(); + } + + CSVParser() + { + delegate = createOpenCsvParser(); + } + + @Override + public List parseLine(String line) throws IOException + { + return Arrays.asList(delegate.parseLine(line)); + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index fe9f438317f8..5e9c959fe62c 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -20,12 +20,17 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; import javax.annotation.Nullable; +import java.io.File; import java.util.List; -public class CsvInputFormat extends DelimitedInputFormat +public class CsvInputFormat extends FlatTextInputFormat { @JsonCreator public CsvInputFormat( @@ -38,4 +43,31 @@ public CsvInputFormat( { super(columns, listDelimiter, ",", hasHeaderRow, findColumnsFromHeader, skipHeaderRows); } + + @Override + @JsonIgnore + public String getDelimiter() + { + return super.getDelimiter(); + } + + @Override + public boolean isSplittable() + { + return true; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new DelimitedValueReader( + inputRowSchema, + source, + getListDelimiter(), + getColumns(), + isFindColumnsFromHeader(), + getSkipHeaderRows(), + new CSVParser() + ); + } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java index 29446a6a92b4..8562cdeea608 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedInputFormat.java @@ -22,19 +22,18 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import com.google.common.base.Splitter; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; -import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.indexer.Checks; -import org.apache.druid.indexer.Property; import javax.annotation.Nullable; import java.io.File; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Objects; /** * InputFormat for customized Delimitor Separate Value format of input data(default is TSV). @@ -42,7 +41,6 @@ public class DelimitedInputFormat extends FlatTextInputFormat { private static final String DEFAULT_DELIMITER = "\t"; - private final String delimiter; @JsonCreator public DelimitedInputFormat( @@ -62,41 +60,6 @@ public DelimitedInputFormat( findColumnsFromHeader, skipHeaderRows ); - this.delimiter = delimiter; - } - - @Override - @JsonProperty - public List getColumns() - { - return super.getColumns(); - } - - @Override - @JsonProperty - public String getListDelimiter() - { - return super.getListDelimiter(); - } - - @JsonProperty("delimiter") - public String getDelimiterString() - { - return delimiter; - } - - @Override - @JsonProperty - public boolean isFindColumnsFromHeader() - { - return super.isFindColumnsFromHeader(); - } - - @Override - @JsonProperty - public int getSkipHeaderRows() - { - return super.getSkipHeaderRows(); } @Override @@ -111,12 +74,36 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity return new DelimitedValueReader( inputRowSchema, source, - temporaryDirectory, getListDelimiter(), getColumns(), isFindColumnsFromHeader(), getSkipHeaderRows(), - getDelimiter() + line -> splitToList(Splitter.on(getDelimiter()), line) ); } + + /** + * Copied from Guava's {@link Splitter#splitToList(CharSequence)}. + * This is to avoid the error of the missing method signature when using an old Guava library. + * For example, it may happen when running Druid Hadoop indexing jobs, since we may inherit the version provided by + * the Hadoop cluster. See https://github.com/apache/druid/issues/6801. + */ + public static List splitToList(Splitter splitter, String input) + { + Preconditions.checkNotNull(input); + + Iterator iterator = splitter.split(input).iterator(); + List result = new ArrayList<>(); + + while (iterator.hasNext()) { + String splitValue = iterator.next(); + if (!NullHandling.replaceWithDefault() && splitValue.isEmpty()) { + result.add(null); + } else { + result.add(splitValue); + } + } + + return Collections.unmodifiableList(result); + } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java index 6d68f55c8ba0..b8b524e6c874 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedValueReader.java @@ -23,15 +23,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.opencsv.RFC4180Parser; -import com.opencsv.RFC4180ParserBuilder; -import com.opencsv.enums.CSVReaderNullFieldIndicator; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.TextReader; -import org.apache.druid.data.input.impl.FlatTextInputFormat.Delimiter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.collect.Utils; import org.apache.druid.java.util.common.parsers.ParseException; @@ -39,9 +34,7 @@ import org.apache.druid.java.util.common.parsers.Parsers; import javax.annotation.Nullable; -import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -54,56 +47,32 @@ public class DelimitedValueReader extends TextReader private final boolean findColumnsFromHeader; private final int skipHeaderRows; private final Function multiValueFunction; + private final DelimitedValueParser parser; @Nullable private List columns; - private final RFC4180Parser parser; - public static RFC4180Parser createOpenCsvParser(char separator) + interface DelimitedValueParser { - return NullHandling.replaceWithDefault() - ? new RFC4180ParserBuilder() - .withSeparator(separator) - .build() - : new RFC4180ParserBuilder().withFieldAsNull( - CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) - .withSeparator(separator) - .build(); + List parseLine(String line) throws IOException; } DelimitedValueReader( InputRowSchema inputRowSchema, InputEntity source, - File temporaryDirectory, @Nullable String listDelimiter, @Nullable List columns, boolean findColumnsFromHeader, int skipHeaderRows, - Delimiter delimiter + DelimitedValueParser parser ) { - super(inputRowSchema, source, temporaryDirectory); + super(inputRowSchema, source); this.findColumnsFromHeader = findColumnsFromHeader; this.skipHeaderRows = skipHeaderRows; final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter)); this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row - this.parser = createOpenCsvParser(delimiter.getDelimiter()); - - if (this.columns != null) { - for (String column : this.columns) { - Preconditions.checkArgument( - !column.contains(delimiter.getDelimiterAsString()), - "Column[%s] has a " + delimiter.getLiteral() + ", it cannot", - column - ); - } - } else { - Preconditions.checkArgument( - findColumnsFromHeader, - "If columns field is not set, the first row of your data must have your header" - + " and hasHeaderRow must be set to true." - ); - } + this.parser = parser; } @Override @@ -121,10 +90,10 @@ public Map toMap(String intermediateRow) throws IOException private Map parseLine(String line) throws IOException { - final String[] parsed = parser.parseLine(line); + final List parsed = parser.parseLine(line); return Utils.zipMapPartial( Preconditions.checkNotNull(columns, "columns"), - Iterables.transform(Arrays.asList(parsed), multiValueFunction) + Iterables.transform(parsed, multiValueFunction) ); } @@ -146,7 +115,7 @@ public void processHeaderLine(String line) throws IOException if (!findColumnsFromHeader) { throw new ISE("Don't call this if findColumnsFromHeader = false"); } - columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line))); + columns = findOrCreateColumnNames(parser.parseLine(line)); if (columns.isEmpty()) { throw new ISE("Empty columns"); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java index ee6bea05f399..b7aa371e64f2 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.impl; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputFormat; @@ -32,56 +33,24 @@ public abstract class FlatTextInputFormat implements InputFormat { - static class Delimiter - { - private final char delimiter; - private final String literal; - - Delimiter(char delimiter, String literal) - { - this.delimiter = delimiter; - this.literal = literal; - } - -// private void setDelimiter(String delimiter, String literal) -// { -// this.delimiter = (delimiter != null && delimiter.length() > 0) ? delimiter.charAt(0) : '\t'; -// this.literal = literal != null ? literal : "customize separator: " + delimiter; -// } - - public String getDelimiterAsString() - { - return String.valueOf(delimiter); - } - - public char getDelimiter() - { - return delimiter; - } - - public String getLiteral() - { - return literal; - } - } - - private final String listDelimiter; private final List columns; + private final String listDelimiter; + private final String delimiter; private final boolean findColumnsFromHeader; private final int skipHeaderRows; - private final Delimiter delimiter; FlatTextInputFormat( @Nullable List columns, @Nullable String listDelimiter, - String stringDelimiter, + String delimiter, @Nullable Boolean hasHeaderRow, @Nullable Boolean findColumnsFromHeader, int skipHeaderRows ) { - this.listDelimiter = listDelimiter; this.columns = columns == null ? Collections.emptyList() : columns; + this.listDelimiter = listDelimiter; + this.delimiter = Preconditions.checkNotNull(delimiter, "delimiter"); //noinspection ConstantConditions this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( ImmutableList.of( @@ -90,22 +59,16 @@ public String getLiteral() ) ).getValue(); this.skipHeaderRows = skipHeaderRows; -// this.stringDelimiter = delimiter == null ? "\t" : delimiter; - this.delimiter = new Delimiter(Preconditions.checkNotNull(stringDelimiter, "stringDelimiter"), ) - Preconditions.checkArgument( - this.stringDelimiter.length() == 1, - "The delimiter should be a single character" - ); Preconditions.checkArgument( - !this.stringDelimiter.equals(listDelimiter), + !delimiter.equals(listDelimiter), "Cannot have same delimiter and list delimiter of [%s]", - this.delimiter + delimiter ); if (!this.columns.isEmpty()) { for (String column : this.columns) { Preconditions.checkArgument( - !column.contains(this.delimiter.getDelimiterAsString()), - "Column[%s] has a " + this.delimiter.getLiteral() + ", it cannot", + !column.contains(delimiter), + "Column[%s] cannot have the delimiter[" + delimiter + "] in its name", column ); } @@ -118,38 +81,31 @@ public String getLiteral() } } -// private static Delimiter getFormat(String delimiter) -// { -// if (",".equals(delimiter)) { -// return Delimiter.COMMA; -// } else if ("\t".equals(delimiter)) { -// return Delimiter.TAB; -// } else { -// Delimiter.CUSTOM.setDelimiter(delimiter, null); -// return Delimiter.CUSTOM; -// } -// } - - public Delimiter getDelimiter() + @JsonProperty + public String getDelimiter() { return delimiter; } + @JsonProperty public List getColumns() { return columns; } + @JsonProperty public String getListDelimiter() { return listDelimiter; } + @JsonProperty public boolean isFindColumnsFromHeader() { return findColumnsFromHeader; } + @JsonProperty public int getSkipHeaderRows() { return skipHeaderRows; diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 0cf8a9fef09d..5026babbbbeb 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -70,7 +70,7 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new JsonReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), objectMapper); + return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java index 50617fce4d20..f1e8cce68676 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; -import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -45,12 +44,11 @@ public class JsonReader extends TextReader JsonReader( InputRowSchema inputRowSchema, InputEntity source, - File temporaryDirectory, JSONPathSpec flattenSpec, ObjectMapper mapper ) { - super(inputRowSchema, source, temporaryDirectory); + super(inputRowSchema, source); this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker()); this.mapper = mapper; } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java index 239ee6378bfb..116551669dbe 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java @@ -57,6 +57,6 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new RegexReader(inputRowSchema, source, temporaryDirectory, pattern, listDelimiter, columns); + return new RegexReader(inputRowSchema, source, pattern, listDelimiter, columns); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java index c7105c8afd0a..2962ebd06187 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.parsers.Parsers; import javax.annotation.Nullable; -import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -51,13 +50,12 @@ public class RegexReader extends TextReader RegexReader( InputRowSchema inputRowSchema, InputEntity source, - File temporaryDirectory, String pattern, @Nullable String listDelimiter, @Nullable List columns ) { - super(inputRowSchema, source, temporaryDirectory); + super(inputRowSchema, source); this.pattern = pattern; this.compiled = Pattern.compile(pattern); final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter; diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java index e6da347bdfff..ad88f304d383 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.opencsv.RFC4180Parser; -import org.apache.druid.data.input.impl.DelimitedValueReader; import javax.annotation.Nullable; import java.io.IOException; @@ -30,7 +29,7 @@ public class CSVParser extends AbstractFlatTextFormatParser { - private final RFC4180Parser parser = DelimitedValueReader.createOpenCsvParser(','); + private final RFC4180Parser parser = org.apache.druid.data.input.impl.CSVParser.createOpenCsvParser(); public CSVParser( @Nullable final String listDelimiter, @@ -53,12 +52,6 @@ public CSVParser( setFieldNames(fieldNames); } - @Override - protected List parseLine(String input) throws IOException - { - return Arrays.asList(parser.parseLine(input)); - } - @VisibleForTesting CSVParser(@Nullable final String listDelimiter, final String header) { @@ -66,4 +59,10 @@ protected List parseLine(String input) throws IOException setFieldNames(header); } + + @Override + protected List parseLine(String input) throws IOException + { + return Arrays.asList(parser.parseLine(input)); + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/DelimitedParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/DelimitedParser.java index 0c0dccc1ec5a..df75cc3f7343 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/DelimitedParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/DelimitedParser.java @@ -22,17 +22,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.DelimitedInputFormat; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; public class DelimitedParser extends AbstractFlatTextFormatParser { - private final String delimiter; private final Splitter splitter; public DelimitedParser( @@ -43,15 +39,15 @@ public DelimitedParser( ) { super(listDelimiter, hasHeaderRow, maxSkipHeaderRows); - this.delimiter = delimiter != null ? delimiter : FlatTextFormat.DELIMITED.getDefaultDelimiter(); + final String finalDelimiter = delimiter != null ? delimiter : FlatTextFormat.DELIMITED.getDefaultDelimiter(); Preconditions.checkState( - !this.delimiter.equals(getListDelimiter()), + !finalDelimiter.equals(getListDelimiter()), "Cannot have same delimiter and list delimiter of [%s]", - this.delimiter + finalDelimiter ); - this.splitter = Splitter.on(this.delimiter); + this.splitter = Splitter.on(finalDelimiter); } public DelimitedParser( @@ -81,28 +77,9 @@ protected List parseLine(String input) return splitToList(input); } - /** - * Copied from Guava's {@link Splitter#splitToList(CharSequence)}. - * This is to avoid the error of the missing method signature when using an old Guava library. - * For example, it may happen when running Druid Hadoop indexing jobs, since we may inherit the version provided by - * the Hadoop cluster. See https://github.com/apache/druid/issues/6801. - */ + private List splitToList(String input) { - Preconditions.checkNotNull(input); - - Iterator iterator = splitter.split(input).iterator(); - List result = new ArrayList(); - - while (iterator.hasNext()) { - String splitValue = iterator.next(); - if (!NullHandling.replaceWithDefault() && splitValue.isEmpty()) { - result.add(null); - } else { - result.add(splitValue); - } - } - - return Collections.unmodifiableList(result); + return DelimitedInputFormat.splitToList(splitter, input); } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java index af06d8a81864..6971da1fc4c0 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java @@ -41,7 +41,7 @@ public void testSerde() throws IOException final DelimitedInputFormat format = new DelimitedInputFormat( Collections.singletonList("a"), "|", - null, + "delim", null, true, 10 From 1b2da156bbd2f175d95035bf8ad6d687e550f2eb Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Jan 2020 15:29:25 -0800 Subject: [PATCH 3/6] default value check for findColumnsFromHeader --- .../data/input/impl/FlatTextInputFormat.java | 16 ++-- .../data/input/impl/CsvInputFormatTest.java | 46 ++++++++++- .../input/impl/DelimitedInputFormatTest.java | 77 ++++++++++++++++--- 3 files changed, 122 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java index b7aa371e64f2..551ebec94a26 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -52,12 +52,16 @@ public abstract class FlatTextInputFormat implements InputFormat this.listDelimiter = listDelimiter; this.delimiter = Preconditions.checkNotNull(delimiter, "delimiter"); //noinspection ConstantConditions - this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( - ImmutableList.of( - new Property<>("hasHeaderRow", hasHeaderRow), - new Property<>("findColumnsFromHeader", findColumnsFromHeader) - ) - ).getValue(); + if (columns == null || columns.isEmpty()) { + this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty( + ImmutableList.of( + new Property<>("hasHeaderRow", hasHeaderRow), + new Property<>("findColumnsFromHeader", findColumnsFromHeader) + ) + ).getValue(); + } else { + this.findColumnsFromHeader = false; + } this.skipHeaderRows = skipHeaderRows; Preconditions.checkArgument( !delimiter.equals(listDelimiter), diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java index ad2316c99938..90d8a0badd8a 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java @@ -48,7 +48,7 @@ public void testSerde() throws IOException public void testComma() { expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Column[a,] has a comma, it cannot"); + expectedException.expectMessage("Column[a,] cannot have the delimiter[,] in its name"); new CsvInputFormat(Collections.singletonList("a,"), "|", null, false, 0); } @@ -59,4 +59,48 @@ public void testDelimiter() expectedException.expectMessage("Cannot have same delimiter and list delimiter of [,]"); new CsvInputFormat(Collections.singletonList("a\t"), ",", null, false, 0); } + + @Test + public void testFindColumnsFromHeaderWithColumnsReturningFalse() + { + final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), null, null, true, 0); + Assert.assertFalse(format.isFindColumnsFromHeader()); + } + + @Test + public void testFindColumnsFromHeaderWithMissingColumnsReturningItsValue() + { + final CsvInputFormat format = new CsvInputFormat(null, null, null, true, 0); + Assert.assertTrue(format.isFindColumnsFromHeader()); + } + + @Test + public void testMissingFindColumnsFromHeaderWithMissingColumnsThrowingError() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("At least one of [Property{name='hasHeaderRow', value=null}"); + new CsvInputFormat(null, null, null, null, 0); + } + + @Test + public void testMissingFindColumnsFromHeaderWithColumnsReturningFalse() + { + final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), null, null, null, 0); + Assert.assertFalse(format.isFindColumnsFromHeader()); + } + + @Test + public void testHasHeaderRowWithMissingFindColumnsThrowingError() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("At most one of [Property{name='hasHeaderRow', value=true}"); + new CsvInputFormat(null, null, true, false, 0); + } + + @Test + public void testHasHeaderRowWithMissingColumnsReturningItsValue() + { + final CsvInputFormat format = new CsvInputFormat(null, null, true, null, 0); + Assert.assertTrue(format.isFindColumnsFromHeader()); + } } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java index 6971da1fc4c0..8b56e1045fc5 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java @@ -55,18 +55,10 @@ public void testSerde() throws IOException public void testTab() { expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Column[a\t] has a tab, it cannot"); + expectedException.expectMessage("Column[a\t] cannot have the delimiter[\t] in its name"); new DelimitedInputFormat(Collections.singletonList("a\t"), ",", null, null, false, 0); } - @Test - public void testDelimiterLength() - { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("The delimiter should be a single character"); - new DelimitedInputFormat(Collections.singletonList("a\t"), ",", "null", null, false, 0); - } - @Test public void testDelimiterAndListDelimiter() { @@ -79,7 +71,72 @@ public void testDelimiterAndListDelimiter() public void testCustomizeSeparator() { expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Column[a|] has a customize separator: |, it cannot"); + expectedException.expectMessage("Column[a|] cannot have the delimiter[|] in its name"); new DelimitedInputFormat(Collections.singletonList("a|"), ",", "|", null, false, 0); } + + @Test + public void testFindColumnsFromHeaderWithColumnsReturningFalse() + { + final DelimitedInputFormat format = new DelimitedInputFormat( + Collections.singletonList("a"), + null, + "delim", + null, + true, + 0 + ); + Assert.assertFalse(format.isFindColumnsFromHeader()); + } + + @Test + public void testFindColumnsFromHeaderWithMissingColumnsReturningItsValue() + { + final DelimitedInputFormat format = new DelimitedInputFormat( + null, + null, + "delim", + null, + true, + 0 + ); + Assert.assertTrue(format.isFindColumnsFromHeader()); + } + + @Test + public void testMissingFindColumnsFromHeaderWithMissingColumnsThrowingError() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("At least one of [Property{name='hasHeaderRow', value=null}"); + new DelimitedInputFormat(null, null, "delim", null, null, 0); + } + + @Test + public void testMissingFindColumnsFromHeaderWithColumnsReturningFalse() + { + final DelimitedInputFormat format = new DelimitedInputFormat( + Collections.singletonList("a"), + null, + "delim", + null, + null, + 0 + ); + Assert.assertFalse(format.isFindColumnsFromHeader()); + } + + @Test + public void testHasHeaderRowWithMissingFindColumnsThrowingError() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("At most one of [Property{name='hasHeaderRow', value=true}"); + new DelimitedInputFormat(null, null, "delim", true, false, 0); + } + + @Test + public void testHasHeaderRowWithMissingColumnsReturningItsValue() + { + final DelimitedInputFormat format = new DelimitedInputFormat(null, null, "delim", true, null, 0); + Assert.assertTrue(format.isFindColumnsFromHeader()); + } } From a87b0c4acd40d3593bcf2abc5d45f9cef231cdb3 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Jan 2020 15:55:40 -0800 Subject: [PATCH 4/6] remove CSVParser to have a true and only CSVParser --- .../druid/data/input/impl/CSVParser.java | 56 ------------------- .../druid/data/input/impl/CsvInputFormat.java | 21 ++++++- .../java/util/common/parsers/CSVParser.java | 3 +- 3 files changed, 21 insertions(+), 59 deletions(-) delete mode 100644 core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java b/core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java deleted file mode 100644 index 0e98a3720a98..000000000000 --- a/core/src/main/java/org/apache/druid/data/input/impl/CSVParser.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.data.input.impl; - -import com.opencsv.RFC4180Parser; -import com.opencsv.RFC4180ParserBuilder; -import com.opencsv.enums.CSVReaderNullFieldIndicator; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.data.input.impl.DelimitedValueReader.DelimitedValueParser; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -public class CSVParser implements DelimitedValueParser -{ - private static final char SEPERATOR = ','; - private final RFC4180Parser delegate; - - public static RFC4180Parser createOpenCsvParser() - { - return NullHandling.replaceWithDefault() - ? new RFC4180ParserBuilder().withSeparator(SEPERATOR).build() - : new RFC4180ParserBuilder().withFieldAsNull(CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) - .withSeparator(SEPERATOR) - .build(); - } - - CSVParser() - { - delegate = createOpenCsvParser(); - } - - @Override - public List parseLine(String line) throws IOException - { - return Arrays.asList(delegate.parseLine(line)); - } -} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index 5e9c959fe62c..6d2136b3db3f 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -22,16 +22,24 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.opencsv.RFC4180Parser; +import com.opencsv.RFC4180ParserBuilder; +import com.opencsv.enums.CSVReaderNullFieldIndicator; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; import javax.annotation.Nullable; import java.io.File; +import java.util.Arrays; import java.util.List; public class CsvInputFormat extends FlatTextInputFormat { + private static final char SEPARATOR = ','; + private static final RFC4180Parser PARSER = createOpenCsvParser(); + @JsonCreator public CsvInputFormat( @JsonProperty("columns") @Nullable List columns, @@ -41,7 +49,7 @@ public CsvInputFormat( @JsonProperty("skipHeaderRows") int skipHeaderRows ) { - super(columns, listDelimiter, ",", hasHeaderRow, findColumnsFromHeader, skipHeaderRows); + super(columns, listDelimiter, String.valueOf(SEPARATOR), hasHeaderRow, findColumnsFromHeader, skipHeaderRows); } @Override @@ -67,7 +75,16 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity getColumns(), isFindColumnsFromHeader(), getSkipHeaderRows(), - new CSVParser() + line -> Arrays.asList(PARSER.parseLine(line)) ); } + + public static RFC4180Parser createOpenCsvParser() + { + return NullHandling.replaceWithDefault() + ? new RFC4180ParserBuilder().withSeparator(SEPARATOR).build() + : new RFC4180ParserBuilder().withFieldAsNull(CSVReaderNullFieldIndicator.EMPTY_SEPARATORS) + .withSeparator(SEPARATOR) + .build(); + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java index ad88f304d383..49f53dacfc78 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CSVParser.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.opencsv.RFC4180Parser; +import org.apache.druid.data.input.impl.CsvInputFormat; import javax.annotation.Nullable; import java.io.IOException; @@ -29,7 +30,7 @@ public class CSVParser extends AbstractFlatTextFormatParser { - private final RFC4180Parser parser = org.apache.druid.data.input.impl.CSVParser.createOpenCsvParser(); + private final RFC4180Parser parser = CsvInputFormat.createOpenCsvParser(); public CSVParser( @Nullable final String listDelimiter, From d3a6ef82fd8148e827dd05819a33d060c37b38df Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Jan 2020 18:30:40 -0800 Subject: [PATCH 5/6] fix tests --- .../data/input/impl/FlatTextInputFormat.java | 2 +- .../data/input/impl/DelimitedReaderTest.java | 101 +----------------- 2 files changed, 3 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java index 551ebec94a26..2fc9d7f01bba 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/FlatTextInputFormat.java @@ -60,7 +60,7 @@ public abstract class FlatTextInputFormat implements InputFormat ) ).getValue(); } else { - this.findColumnsFromHeader = false; + this.findColumnsFromHeader = findColumnsFromHeader == null ? false : findColumnsFromHeader; } this.skipHeaderRows = skipHeaderRows; Preconditions.checkArgument( diff --git a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java index 5f3bce44dd31..e590ed566a93 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedReaderTest.java @@ -20,13 +20,11 @@ package org.apache.druid.data.input.impl; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -38,7 +36,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -200,106 +197,12 @@ public void testCustomizeSeparator() throws IOException } } - @Test - public void testQuotes() throws IOException - { - final ByteEntity source = writeData( - ImmutableList.of( - "3\t\"Lets do some \"\"normal\"\" quotes\"\t2018-05-05T10:00:00Z", - "34\t\"Lets do some \"\"normal\"\", quotes with comma\"\t2018-05-06T10:00:00Z", - "343\t\"Lets try \\\"\"it\\\"\" with slash quotes\"\t2018-05-07T10:00:00Z", - "545\t\"Lets try \\\"\"it\\\"\", with slash quotes and comma\"\t2018-05-08T10:00:00Z", - "65\tHere I write \\n slash n\t2018-05-09T10:00:00Z" - ) - ); - final List expectedResults = ImmutableList.of( - new MapBasedInputRow( - DateTimes.of("2018-05-05T10:00:00Z"), - ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", - "3", - "Comment", - "Lets do some \"normal\" quotes", - "Timestamp", - "2018-05-05T10:00:00Z" - ) - ), - new MapBasedInputRow( - DateTimes.of("2018-05-06T10:00:00Z"), - ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", - "34", - "Comment", - "Lets do some \"normal\", quotes with comma", - "Timestamp", - "2018-05-06T10:00:00Z" - ) - ), - new MapBasedInputRow( - DateTimes.of("2018-05-07T10:00:00Z"), - ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", - "343", - "Comment", - "Lets try \\\"it\\\" with slash quotes", - "Timestamp", - "2018-05-07T10:00:00Z" - ) - ), - new MapBasedInputRow( - DateTimes.of("2018-05-08T10:00:00Z"), - ImmutableList.of("Timestamp"), - ImmutableMap.of( - "Value", - "545", - "Comment", - "Lets try \\\"it\\\", with slash quotes and comma", - "Timestamp", - "2018-05-08T10:00:00Z" - ) - ), - new MapBasedInputRow( - DateTimes.of("2018-05-09T10:00:00Z"), - ImmutableList.of("Timestamp"), - ImmutableMap.of("Value", "65", "Comment", "Here I write \\n slash n", "Timestamp", "2018-05-09T10:00:00Z") - ) - ); - final DelimitedInputFormat format = new DelimitedInputFormat( - ImmutableList.of("Value", "Comment", "Timestamp"), - null, - null, - null, - false, - 0 - ); - final InputEntityReader reader = format.createReader( - new InputRowSchema( - new TimestampSpec("Timestamp", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))), - Collections.emptyList() - ), - source, - null - ); - - try (CloseableIterator iterator = reader.read()) { - final Iterator expectedRowIterator = expectedResults.iterator(); - while (iterator.hasNext()) { - Assert.assertTrue(expectedRowIterator.hasNext()); - Assert.assertEquals(expectedRowIterator.next(), iterator.next()); - } - } - } - @Test public void testRussianTextMess() throws IOException { final ByteEntity source = writeData( ImmutableList.of( - "2019-01-01T00:00:10Z\tname_1\t\"Как говорится: \\\"\"всё течет\t всё изменяется\\\"\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева\"" + "2019-01-01T00:00:10Z\tname_1\tКак говорится: \\\"всё течет всё изменяется\\\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева" ) ); final DelimitedInputFormat format = new DelimitedInputFormat( @@ -317,7 +220,7 @@ public void testRussianTextMess() throws IOException Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp()); Assert.assertEquals("name_1", Iterables.getOnlyElement(row.getDimension("name"))); Assert.assertEquals( - "Как говорится: \\\"всё течет\t всё изменяется\\\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева", + "Как говорится: \\\"всё течет всё изменяется\\\". Украина как всегда обвиняет Россию в собственных проблемах. #ПровокацияКиева", Iterables.getOnlyElement(row.getDimension("Comment")) ); Assert.assertFalse(iterator.hasNext()); From e0f744a7276bebd0bfe6cf8a27e83963ddc034c2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Jan 2020 20:22:01 -0800 Subject: [PATCH 6/6] fix another test --- .../org/apache/druid/data/input/impl/CsvInputFormatTest.java | 4 ++-- .../druid/data/input/impl/DelimitedInputFormatTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java index 90d8a0badd8a..299fb145cf5f 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CsvInputFormatTest.java @@ -61,10 +61,10 @@ public void testDelimiter() } @Test - public void testFindColumnsFromHeaderWithColumnsReturningFalse() + public void testFindColumnsFromHeaderWithColumnsReturningItsValue() { final CsvInputFormat format = new CsvInputFormat(Collections.singletonList("a"), null, null, true, 0); - Assert.assertFalse(format.isFindColumnsFromHeader()); + Assert.assertTrue(format.isFindColumnsFromHeader()); } @Test diff --git a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java index 8b56e1045fc5..a21e34d8cc60 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/DelimitedInputFormatTest.java @@ -76,7 +76,7 @@ public void testCustomizeSeparator() } @Test - public void testFindColumnsFromHeaderWithColumnsReturningFalse() + public void testFindColumnsFromHeaderWithColumnsReturningItsValue() { final DelimitedInputFormat format = new DelimitedInputFormat( Collections.singletonList("a"), @@ -86,7 +86,7 @@ public void testFindColumnsFromHeaderWithColumnsReturningFalse() true, 0 ); - Assert.assertFalse(format.isFindColumnsFromHeader()); + Assert.assertTrue(format.isFindColumnsFromHeader()); } @Test