diff --git a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java index 36a6b670fb7c..ec787bd21856 100644 --- a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.druid.java.util.common.parsers.CSVParser; import io.druid.java.util.common.parsers.Parser; @@ -114,7 +113,7 @@ public void verify(List usedCols) @Override public Parser makeParser() { - return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow, skipHeaderRows); + return new CSVParser(listDelimiter, columns, hasHeaderRow, skipHeaderRows); } @Override diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index 4facbca3e01c..be360675bab7 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.Parser; @@ -125,8 +124,8 @@ public void verify(List usedCols) public Parser makeParser() { return new DelimitedParser( - Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter), + delimiter, + listDelimiter, columns, hasHeaderRow, skipHeaderRows diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/UriExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/UriExtractionNamespace.java index 8505cf3868e0..445237d2f311 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/UriExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/UriExtractionNamespace.java @@ -29,7 +29,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -296,7 +295,7 @@ public CSVFlatDataParser( ); this.parser = new DelegateParser( - new CSVParser(Optional.absent(), columns, hasHeaderRow, skipHeaderRows), + new CSVParser(null, columns, hasHeaderRow, skipHeaderRows), this.keyColumn, this.valueColumn ); @@ -396,8 +395,8 @@ public TSVFlatDataParser( "Must specify more than one column to have a key value pair" ); final DelimitedParser delegate = new DelimitedParser( - Optional.fromNullable(Strings.emptyToNull(delimiter)), - Optional.fromNullable(Strings.emptyToNull(listDelimiter)), + Strings.emptyToNull(delimiter), + Strings.emptyToNull(listDelimiter), hasHeaderRow, skipHeaderRows ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 22fddad373db..c812d17fe67d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.io.Files; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; @@ -39,6 +40,7 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.overlord.SegmentPublishResult; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.AggregatorFactory; @@ -73,8 +75,10 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class IndexTaskTest { @@ -598,6 +602,139 @@ public void testReportParseException() throws Exception runTask(indexTask); } + @Test + public void testCsvWithHeaderOfEmptyColumns() throws Exception + { + final File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,,\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,dim,\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,,val\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + null, + null + ), + null, + null, + true, + 0 + ), + null, + 2, + null, + false, + false, + true // report parse exception + ); + + IndexTask indexTask = new IndexTask( + null, + null, + parseExceptionIgnoreSpec, + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + // the order of result segments can be changed because hash shardSpec is used. + // the below loop is to make this test deterministic. + Assert.assertEquals(2, segments.size()); + Assert.assertNotEquals(segments.get(0), segments.get(1)); + + for (int i = 0; i < 2; i++) { + final DataSegment segment = segments.get(i); + final Set dimensions = new HashSet<>(segment.getDimensions()); + + Assert.assertTrue( + StringUtils.format("Actual dimensions: %s", dimensions), + dimensions.equals(Sets.newHashSet("dim", "column_3")) || + dimensions.equals(Sets.newHashSet("column_2", "column_3")) + ); + + Assert.assertEquals(Arrays.asList("val"), segment.getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segment.getInterval()); + } + } + + @Test + public void testCsvWithHeaderOfEmptyTimestamp() throws Exception + { + expectedException.expect(ParseException.class); + expectedException.expectMessage("Unparseable timestamp found!"); + + final File tmpDir = temporaryFolder.newFolder(); + + final File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write(",,\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "", ""), + true, + 0 + ), + null, + 2, + null, + false, + false, + true // report parse exception + ); + + IndexTask indexTask = new IndexTask( + null, + null, + parseExceptionIgnoreSpec, + null, + jsonMapper + ); + + runTask(indexTask); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java new file mode 100644 index 000000000000..7d3f43c9c336 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/AbstractFlatTextFormatParser.java @@ -0,0 +1,165 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.parsers; + +import com.google.common.base.Function; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.java.util.common.collect.Utils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class AbstractFlatTextFormatParser implements Parser +{ + public enum FlatTextFormat + { + CSV(","), + DELIMITED("\t"); + + private final String defaultDelimiter; + + FlatTextFormat(String defaultDelimiter) + { + this.defaultDelimiter = defaultDelimiter; + } + + public String getDefaultDelimiter() + { + return defaultDelimiter; + } + } + + private final String listDelimiter; + private final Splitter listSplitter; + private final Function valueFunction; + private final boolean hasHeaderRow; + private final int maxSkipHeaderRows; + + private List fieldNames = null; + private boolean hasParsedHeader = false; + private int skippedHeaderRows; + private boolean supportSkipHeaderRows; + + public AbstractFlatTextFormatParser( + @Nullable final String listDelimiter, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) + { + this.listDelimiter = listDelimiter != null ? listDelimiter : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = ParserUtils.getMultiValueFunction(this.listDelimiter, this.listSplitter); + + this.hasHeaderRow = hasHeaderRow; + this.maxSkipHeaderRows = maxSkipHeaderRows; + } + + @Override + public void startFileFromBeginning() + { + if (hasHeaderRow) { + fieldNames = null; + } + hasParsedHeader = false; + skippedHeaderRows = 0; + supportSkipHeaderRows = true; + } + + public String getListDelimiter() + { + return listDelimiter; + } + + @Override + public List getFieldNames() + { + return fieldNames; + } + + @Override + public void setFieldNames(final Iterable fieldNames) + { + if (fieldNames != null) { + final List fieldsList = Lists.newArrayList(fieldNames); + this.fieldNames = new ArrayList<>(fieldsList.size()); + for (int i = 0; i < fieldsList.size(); i++) { + if (Strings.isNullOrEmpty(fieldsList.get(i))) { + this.fieldNames.add(ParserUtils.getDefaultColumnName(i)); + } else { + this.fieldNames.add(fieldsList.get(i)); + } + } + ParserUtils.validateFields(this.fieldNames); + } + } + + public void setFieldNames(final String header) + { + try { + setFieldNames(parseLine(header)); + } + catch (Exception e) { + throw new ParseException(e, "Unable to parse header [%s]", header); + } + } + + @Override + public Map parse(final String input) + { + if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { + throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " + + "Please check the indexTask supports these options."); + } + + try { + List values = parseLine(input); + + if (skippedHeaderRows < maxSkipHeaderRows) { + skippedHeaderRows++; + return null; + } + + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(values); + } + hasParsedHeader = true; + return null; + } + + if (fieldNames == null) { + setFieldNames(ParserUtils.generateFieldNames(values.size())); + } + + return Utils.zipMapPartial(fieldNames, Iterables.transform(values, valueFunction)); + } + catch (Exception e) { + throw new ParseException(e, "Unable to parse row [%s]", input); + } + } + + protected abstract List parseLine(String input) throws IOException; +} diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index 180b095ae942..c5d18ae41ced 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -20,73 +20,27 @@ package io.druid.java.util.common.parsers; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import io.druid.java.util.common.collect.Utils; -import java.util.ArrayList; +import javax.annotation.Nullable; +import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Map; -public class CSVParser implements Parser +public class CSVParser extends AbstractFlatTextFormatParser { - private static final Function getValueFunction( - final String listDelimiter, - final Splitter listSplitter - ) - { - return new Function() - { - @Override - public Object apply(String input) - { - if (input.contains(listDelimiter)) { - return Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); - } else { - return ParserUtils.nullEmptyStringFunction.apply(input); - } - } - }; - } - - private final String listDelimiter; - private final Splitter listSplitter; - private final Function valueFunction; - private final boolean hasHeaderRow; - private final int maxSkipHeaderRows; - private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); - private ArrayList fieldNames = null; - private boolean hasParsedHeader = false; - private int skippedHeaderRows; - private boolean supportSkipHeaderRows; - public CSVParser( - final Optional listDelimiter, + @Nullable final String listDelimiter, final boolean hasHeaderRow, final int maxSkipHeaderRows ) { - this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; - this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); - - this.hasHeaderRow = hasHeaderRow; - this.maxSkipHeaderRows = maxSkipHeaderRows; + super(listDelimiter, hasHeaderRow, maxSkipHeaderRows); } public CSVParser( - final Optional listDelimiter, + @Nullable final String listDelimiter, final Iterable fieldNames, final boolean hasHeaderRow, final int maxSkipHeaderRows @@ -97,87 +51,17 @@ public CSVParser( setFieldNames(fieldNames); } - @VisibleForTesting - CSVParser(final Optional listDelimiter, final String header) - { - this(listDelimiter, false, 0); - - setFieldNames(header); - } - - public String getListDelimiter() - { - return listDelimiter; - } - - @Override - public void startFileFromBeginning() - { - if (hasHeaderRow) { - fieldNames = null; - } - hasParsedHeader = false; - skippedHeaderRows = 0; - supportSkipHeaderRows = true; - } - @Override - public List getFieldNames() - { - return fieldNames; - } - - @Override - public void setFieldNames(final Iterable fieldNames) - { - if (fieldNames != null) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); - } - } - - public void setFieldNames(final String header) + protected List parseLine(String input) throws IOException { - try { - setFieldNames(Arrays.asList(parser.parseLine(header))); - } - catch (Exception e) { - throw new ParseException(e, "Unable to parse header [%s]", header); - } + return Arrays.asList(parser.parseLine(input)); } - @Override - public Map parse(final String input) + @VisibleForTesting + CSVParser(@Nullable final String listDelimiter, final String header) { - if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { - throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " - + "Please check the indexTask supports these options."); - } - - try { - String[] values = parser.parseLine(input); - - if (skippedHeaderRows < maxSkipHeaderRows) { - skippedHeaderRows++; - return null; - } - - if (hasHeaderRow && !hasParsedHeader) { - if (fieldNames == null) { - setFieldNames(Arrays.asList(values)); - } - hasParsedHeader = true; - return null; - } - - if (fieldNames == null) { - setFieldNames(ParserUtils.generateFieldNames(values.length)); - } + this(listDelimiter, false, 0); - return Utils.zipMapPartial(fieldNames, Iterables.transform(Lists.newArrayList(values), valueFunction)); - } - catch (Exception e) { - throw new ParseException(e, "Unable to parse row [%s]", input); - } + setFieldNames(header); } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index c052daee17e0..e8477324c8cf 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -20,81 +20,40 @@ package io.druid.java.util.common.parsers; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import io.druid.java.util.common.collect.Utils; -import java.util.ArrayList; +import javax.annotation.Nullable; +import java.io.IOException; import java.util.List; -import java.util.Map; -public class DelimitedParser implements Parser +public class DelimitedParser extends AbstractFlatTextFormatParser { - private static final String DEFAULT_DELIMITER = "\t"; - - private static Function getValueFunction( - final String listDelimiter, - final Splitter listSplitter - ) - { - return (input) -> { - if (input.contains(listDelimiter)) { - return Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); - } else { - return ParserUtils.nullEmptyStringFunction.apply(input); - } - }; - } - private final String delimiter; - private final String listDelimiter; private final Splitter splitter; - private final Splitter listSplitter; - private final Function valueFunction; - private final boolean hasHeaderRow; - private final int maxSkipHeaderRows; - - private ArrayList fieldNames = null; - private boolean hasParsedHeader = false; - private int skippedHeaderRows; - private boolean supportSkipHeaderRows; public DelimitedParser( - final Optional delimiter, - final Optional listDelimiter, + @Nullable final String delimiter, + @Nullable final String listDelimiter, final boolean hasHeaderRow, final int maxSkipHeaderRows ) { - this.delimiter = delimiter.isPresent() ? delimiter.get() : DEFAULT_DELIMITER; - this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + super(listDelimiter, hasHeaderRow, maxSkipHeaderRows); + this.delimiter = delimiter != null ? delimiter : FlatTextFormat.DELIMITED.getDefaultDelimiter(); Preconditions.checkState( - !this.delimiter.equals(this.listDelimiter), + !this.delimiter.equals(getListDelimiter()), "Cannot have same delimiter and list delimiter of [%s]", this.delimiter ); this.splitter = Splitter.on(this.delimiter); - this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); - this.hasHeaderRow = hasHeaderRow; - this.maxSkipHeaderRows = maxSkipHeaderRows; } public DelimitedParser( - final Optional delimiter, - final Optional listDelimiter, + @Nullable final String delimiter, + @Nullable final String listDelimiter, final Iterable fieldNames, final boolean hasHeaderRow, final int maxSkipHeaderRows @@ -106,7 +65,7 @@ public DelimitedParser( } @VisibleForTesting - DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) + DelimitedParser(@Nullable final String delimiter, @Nullable final String listDelimiter, final String header) { this(delimiter, listDelimiter, false, 0); @@ -118,79 +77,9 @@ public String getDelimiter() return delimiter; } - public String getListDelimiter() - { - return listDelimiter; - } - @Override - public void startFileFromBeginning() + protected List parseLine(String input) throws IOException { - if (hasHeaderRow) { - fieldNames = null; - } - hasParsedHeader = false; - skippedHeaderRows = 0; - supportSkipHeaderRows = true; - } - - @Override - public List getFieldNames() - { - return fieldNames; - } - - @Override - public void setFieldNames(final Iterable fieldNames) - { - if (fieldNames != null) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); - } - } - - public void setFieldNames(String header) - { - try { - setFieldNames(splitter.split(header)); - } - catch (Exception e) { - throw new ParseException(e, "Unable to parse header [%s]", header); - } - } - - @Override - public Map parse(final String input) - { - if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { - throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " - + "Please check the indexTask supports these options."); - } - - try { - Iterable values = splitter.split(input); - - if (skippedHeaderRows < maxSkipHeaderRows) { - skippedHeaderRows++; - return null; - } - - if (hasHeaderRow && !hasParsedHeader) { - if (fieldNames == null) { - setFieldNames(values); - } - hasParsedHeader = true; - return null; - } - - if (fieldNames == null) { - setFieldNames(ParserUtils.generateFieldNames(Iterators.size(values.iterator()))); - } - - return Utils.zipMapPartial(fieldNames, Iterables.transform(values, valueFunction)); - } - catch (Exception e) { - throw new ParseException(e, "Unable to parse row [%s]", input); - } + return splitter.splitToList(input); } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/ParserUtils.java b/java-util/src/main/java/io/druid/java/util/common/parsers/ParserUtils.java index 7dfe429b3ba9..a62b723752f7 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/ParserUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/ParserUtils.java @@ -20,49 +20,45 @@ package io.druid.java.util.common.parsers; import com.google.common.base.Function; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.Sets; import io.druid.java.util.common.StringUtils; -import org.joda.time.DateTime; import java.util.ArrayList; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class ParserUtils { - public static final Function nullEmptyStringFunction = new Function() + private static final String DEFAULT_COLUMN_NAME_PREFIX = "column_"; + + public static Function getMultiValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) { - @Override - public String apply(String input) - { - if (input == null || input.isEmpty()) { - return null; + return (input) -> { + if (input != null && input.contains(listDelimiter)) { + return StreamSupport.stream(listSplitter.split(input).spliterator(), false) + .map(Strings::emptyToNull) + .collect(Collectors.toList()); + } else { + return Strings.emptyToNull(input); } - return input; - } - }; + }; + } public static ArrayList generateFieldNames(int length) { - ArrayList names = new ArrayList<>(length); + final ArrayList names = new ArrayList<>(length); for (int i = 0; i < length; ++i) { - names.add("column_" + (i + 1)); + names.add(getDefaultColumnName(i)); } return names; } - /** - * Factored timestamp parsing into its own Parser class, but leaving this here - * for compatibility - * - * @param format - * - * @return - */ - public static Function createTimestampParser(final String format) - { - return TimestampParser.createTimestampParser(format); - } - public static Set findDuplicates(Iterable fieldNames) { Set duplicates = Sets.newHashSet(); @@ -95,4 +91,15 @@ public static String stripQuotes(String input) } return input; } + + /** + * Return a function to generate default column names. + * Note that the postfix for default column names starts from 1. + * + * @return column name generating function + */ + public static String getDefaultColumnName(int ordinal) + { + return DEFAULT_COLUMN_NAME_PREFIX + (ordinal + 1); + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java index 329b02aa944a..c9ca448ce98d 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.druid.java.util.common.collect.Utils; @@ -30,6 +31,8 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** */ @@ -56,12 +59,9 @@ public RegexParser( @Override public Object apply(String input) { - final List retVal = Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); + final List retVal = StreamSupport.stream(listSplitter.split(input).spliterator(), false) + .map(Strings::emptyToNull) + .collect(Collectors.toList()); if (retVal.size() == 1) { return retVal.get(0); } else { diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java deleted file mode 100644 index f57bd4077a1f..000000000000 --- a/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.parsers; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Map; - -public class CSVParserTest -{ - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Test - public void testValidHeader() - { - String csv = "time,value1,value2"; - final Parser csvParser; - boolean parseable = true; - try { - csvParser = new CSVParser(Optional.fromNullable(null), csv); - } - catch (Exception e) { - parseable = false; - } - finally { - Assert.assertTrue(parseable); - } - } - - @Test - public void testInvalidHeader() - { - String csv = "time,value1,value2,value2"; - final Parser csvParser; - boolean parseable = true; - try { - csvParser = new CSVParser(Optional.fromNullable(null), csv); - } - catch (Exception e) { - parseable = false; - } - finally { - Assert.assertFalse(parseable); - } - } - - @Test - public void testCSVParserWithHeader() - { - String header = "time,value1,value2"; - final Parser csvParser = new CSVParser(Optional.fromNullable(null), header); - String body = "hello,world,foo"; - final Map jsonMap = csvParser.parse(body); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), - jsonMap - ); - } - - @Test - public void testCSVParserWithoutHeader() - { - final Parser csvParser = new CSVParser(Optional.fromNullable(null), false, 0); - String body = "hello,world,foo"; - final Map jsonMap = csvParser.parse(body); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), - jsonMap - ); - } - - @Test - public void testCSVParserWithSkipHeaderRows() - { - final int skipHeaderRows = 2; - final Parser csvParser = new CSVParser( - Optional.absent(), - false, - skipHeaderRows - ); - csvParser.startFileFromBeginning(); - final String[] body = new String[] { - "header,line,1", - "header,line,2", - "hello,world,foo" - }; - int index; - for (index = 0; index < skipHeaderRows; index++) { - Assert.assertNull(csvParser.parse(body[index])); - } - final Map jsonMap = csvParser.parse(body[index]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), - jsonMap - ); - } - - @Test - public void testCSVParserWithHeaderRow() - { - final Parser csvParser = new CSVParser( - Optional.absent(), - true, - 0 - ); - csvParser.startFileFromBeginning(); - final String[] body = new String[] { - "time,value1,value2", - "hello,world,foo" - }; - Assert.assertNull(csvParser.parse(body[0])); - final Map jsonMap = csvParser.parse(body[1]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), - jsonMap - ); - } - - @Test - public void testCSVParserWithDifferentHeaderRows() - { - final Parser csvParser = new CSVParser( - Optional.absent(), - true, - 0 - ); - csvParser.startFileFromBeginning(); - final String[] body = new String[] { - "time,value1,value2", - "hello,world,foo" - }; - Assert.assertNull(csvParser.parse(body[0])); - Map jsonMap = csvParser.parse(body[1]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), - jsonMap - ); - - csvParser.startFileFromBeginning(); - final String[] body2 = new String[] { - "time,value1,value2,value3", - "hello,world,foo,bar" - }; - Assert.assertNull(csvParser.parse(body2[0])); - jsonMap = csvParser.parse(body2[1]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo", "value3", "bar"), - jsonMap - ); - } - - @Test - public void testCSVParserWithoutStartFileFromBeginning() - { - expectedException.expect(UnsupportedOperationException.class); - expectedException.expectMessage( - "hasHeaderRow or maxSkipHeaderRows is not supported. Please check the indexTask supports these options." - ); - - final int skipHeaderRows = 2; - final Parser csvParser = new CSVParser( - Optional.absent(), - false, - skipHeaderRows - ); - final String[] body = new String[] { - "header,line,1", - "header,line,2", - "hello,world,foo" - }; - csvParser.parse(body[0]); - } -} diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java deleted file mode 100644 index ca913253eb61..000000000000 --- a/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.parsers; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Map; - -public class DelimitedParserTest -{ - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Test - public void testValidHeader() - { - String tsv = "time\tvalue1\tvalue2"; - final Parser delimitedParser; - boolean parseable = true; - try { - delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.absent(), tsv); - } - catch (Exception e) { - parseable = false; - } - finally { - Assert.assertTrue(parseable); - } - } - - @Test - public void testInvalidHeader() - { - String tsv = "time\tvalue1\tvalue2\tvalue2"; - final Parser delimitedParser; - boolean parseable = true; - try { - delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.absent(), tsv); - } - catch (Exception e) { - parseable = false; - } - finally { - Assert.assertFalse(parseable); - } - } - - @Test - public void testTSVParserWithHeader() - { - String header = "time\tvalue1\tvalue2"; - final Parser delimitedParser = new DelimitedParser( - Optional.of("\t"), - Optional.absent(), - header - ); - String body = "hello\tworld\tfoo"; - final Map jsonMap = delimitedParser.parse(body); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), - jsonMap - ); - } - - @Test - public void testTSVParserWithoutHeader() - { - final Parser delimitedParser = new DelimitedParser( - Optional.of("\t"), - Optional.absent(), - false, - 0 - ); - String body = "hello\tworld\tfoo"; - final Map jsonMap = delimitedParser.parse(body); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), - jsonMap - ); - } - - @Test - public void testTSVParserWithSkipHeaderRows() - { - final int skipHeaderRows = 2; - final Parser delimitedParser = new DelimitedParser( - Optional.of("\t"), - Optional.absent(), - false, - skipHeaderRows - ); - delimitedParser.startFileFromBeginning(); - final String[] body = new String[] { - "header\tline\t1", - "header\tline\t2", - "hello\tworld\tfoo" - }; - int index; - for (index = 0; index < skipHeaderRows; index++) { - Assert.assertNull(delimitedParser.parse(body[index])); - } - final Map jsonMap = delimitedParser.parse(body[index]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), - jsonMap - ); - } - - @Test - public void testTSVParserWithHeaderRow() - { - final Parser parser = new DelimitedParser( - Optional.of("\t"), - Optional.absent(), - true, - 0 - ); - parser.startFileFromBeginning(); - final String[] body = new String[] { - "time\tvalue1\tvalue2", - "hello\tworld\tfoo" - }; - Assert.assertNull(parser.parse(body[0])); - final Map jsonMap = parser.parse(body[1]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), - jsonMap - ); - } - - @Test - public void testTSVParserWithDifferentHeaderRows() - { - final Parser csvParser = new DelimitedParser( - Optional.of("\t"), - Optional.absent(), - true, - 0 - ); - csvParser.startFileFromBeginning(); - final String[] body = new String[] { - "time\tvalue1\tvalue2", - "hello\tworld\tfoo" - }; - Assert.assertNull(csvParser.parse(body[0])); - Map jsonMap = csvParser.parse(body[1]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), - jsonMap - ); - - csvParser.startFileFromBeginning(); - final String[] body2 = new String[] { - "time\tvalue1\tvalue2\tvalue3", - "hello\tworld\tfoo\tbar" - }; - Assert.assertNull(csvParser.parse(body2[0])); - jsonMap = csvParser.parse(body2[1]); - Assert.assertEquals( - "jsonMap", - ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo", "value3", "bar"), - jsonMap - ); - } - - @Test - public void testTSVParserWithoutStartFileFromBeginning() - { - expectedException.expect(UnsupportedOperationException.class); - expectedException.expectMessage( - "hasHeaderRow or maxSkipHeaderRows is not supported. Please check the indexTask supports these options." - ); - - final int skipHeaderRows = 2; - final Parser delimitedParser = new DelimitedParser( - Optional.of("\t"), - Optional.absent(), - false, - skipHeaderRows - ); - final String[] body = new String[] { - "header\tline\t1", - "header\tline\t2", - "hello\tworld\tfoo" - }; - delimitedParser.parse(body[0]); - } -} diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/FlatTextFormatParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/FlatTextFormatParserTest.java new file mode 100644 index 000000000000..a3264e300208 --- /dev/null +++ b/java-util/src/test/java/io/druid/java/util/common/parsers/FlatTextFormatParserTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.parsers; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.parsers.AbstractFlatTextFormatParser.FlatTextFormat; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; + +@RunWith(Parameterized.class) +public class FlatTextFormatParserTest +{ + @Parameters(name = "{0}") + public static Collection constructorFeeder() + { + return ImmutableList.of( + new Object[]{FlatTextFormat.CSV}, + new Object[]{FlatTextFormat.DELIMITED} + ); + } + + private static final FlatTextFormatParserFactory parserFactory = new FlatTextFormatParserFactory(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final FlatTextFormat format; + + public FlatTextFormatParserTest(FlatTextFormat format) + { + this.format = format; + } + + @Test + public void testValidHeader() + { + final String header = concat(format, "time", "value1", "value2"); + final Parser parser = parserFactory.get(format, header); + Assert.assertEquals(ImmutableList.of("time", "value1", "value2"), parser.getFieldNames()); + } + + @Test + public void testDuplicatedColumnName() + { + final String header = concat(format, "time", "value1", "value2", "value2"); + + expectedException.expect(ParseException.class); + expectedException.expectMessage(StringUtils.format("Unable to parse header [%s]", header)); + + parserFactory.get(format, header); + } + + @Test + public void testWithHeader() + { + final String header = concat(format, "time", "value1", "value2"); + final Parser parser = parserFactory.get(format, header); + final String body = concat(format, "hello", "world", "foo"); + final Map jsonMap = parser.parse(body); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), + jsonMap + ); + } + + @Test + public void testWithoutHeader() + { + final Parser parser = parserFactory.get(format); + final String body = concat(format, "hello", "world", "foo"); + final Map jsonMap = parser.parse(body); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), + jsonMap + ); + } + + @Test + public void testWithSkipHeaderRows() + { + final int skipHeaderRows = 2; + final Parser parser = parserFactory.get(format, false, skipHeaderRows); + parser.startFileFromBeginning(); + final String[] body = new String[]{ + concat(format, "header", "line", "1"), + concat(format, "header", "line", "2"), + concat(format, "hello", "world", "foo") + }; + int index; + for (index = 0; index < skipHeaderRows; index++) { + Assert.assertNull(parser.parse(body[index])); + } + final Map jsonMap = parser.parse(body[index]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), + jsonMap + ); + } + + @Test + public void testWithHeaderRow() + { + final Parser parser = parserFactory.get(format, true, 0); + parser.startFileFromBeginning(); + final String[] body = new String[]{ + concat(format, "time", "value1", "value2"), + concat(format, "hello", "world", "foo") + }; + Assert.assertNull(parser.parse(body[0])); + final Map jsonMap = parser.parse(body[1]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), + jsonMap + ); + } + + @Test + public void testWithHeaderRowOfEmptyColumns() + { + final Parser parser = parserFactory.get(format, true, 0); + parser.startFileFromBeginning(); + final String[] body = new String[]{ + concat(format, "time", "", "value2", ""), + concat(format, "hello", "world", "foo", "bar") + }; + Assert.assertNull(parser.parse(body[0])); + final Map jsonMap = parser.parse(body[1]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("time", "hello", "column_2", "world", "value2", "foo", "column_4", "bar"), + jsonMap + ); + } + + @Test + public void testWithDifferentHeaderRows() + { + final Parser parser = parserFactory.get(format, true, 0); + parser.startFileFromBeginning(); + final String[] body = new String[]{ + concat(format, "time", "value1", "value2"), + concat(format, "hello", "world", "foo") + }; + Assert.assertNull(parser.parse(body[0])); + Map jsonMap = parser.parse(body[1]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo"), + jsonMap + ); + + parser.startFileFromBeginning(); + final String[] body2 = new String[]{ + concat(format, "time", "value1", "value2", "value3"), + concat(format, "hello", "world", "foo", "bar") + }; + Assert.assertNull(parser.parse(body2[0])); + jsonMap = parser.parse(body2[1]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("time", "hello", "value1", "world", "value2", "foo", "value3", "bar"), + jsonMap + ); + } + + @Test + public void testWithoutStartFileFromBeginning() + { + expectedException.expect(UnsupportedOperationException.class); + expectedException.expectMessage( + "hasHeaderRow or maxSkipHeaderRows is not supported. Please check the indexTask supports these options." + ); + + final int skipHeaderRows = 2; + final Parser parser = parserFactory.get(format, false, skipHeaderRows); + final String[] body = new String[]{ + concat(format, "header", "line", "1"), + concat(format, "header", "line", "2"), + concat(format, "hello", "world", "foo") + }; + parser.parse(body[0]); + } + + private static class FlatTextFormatParserFactory + { + public Parser get(FlatTextFormat format) + { + return get(format, false, 0); + } + + public Parser get(FlatTextFormat format, boolean hasHeaderRow, int maxSkipHeaderRows) + { + switch (format) { + case CSV: + return new CSVParser(null, hasHeaderRow, maxSkipHeaderRows); + case DELIMITED: + return new DelimitedParser("\t", null, hasHeaderRow, maxSkipHeaderRows); + default: + throw new IAE("Unknown format[%s]", format); + } + } + + public Parser get(FlatTextFormat format, String header) + { + switch (format) { + case CSV: + return new CSVParser(null, header); + case DELIMITED: + return new DelimitedParser("\t", null, header); + default: + throw new IAE("Unknown format[%s]", format); + } + } + } + + private static String concat(FlatTextFormat format, String ... values) + { + return Arrays.stream(values).collect(Collectors.joining(format.getDefaultDelimiter())); + } +}