diff --git a/api/src/main/java/io/druid/data/input/Firehose.java b/api/src/main/java/io/druid/data/input/Firehose.java index a768e778d81a..4f4c640f1040 100644 --- a/api/src/main/java/io/druid/data/input/Firehose.java +++ b/api/src/main/java/io/druid/data/input/Firehose.java @@ -19,6 +19,7 @@ package io.druid.data.input; +import javax.annotation.Nullable; import java.io.Closeable; /** @@ -46,14 +47,16 @@ public interface Firehose extends Closeable * * @return true if and when there is another row available, false if the stream has dried up */ - public boolean hasMore(); + boolean hasMore(); /** * The next row available. Should only be called if hasMore returns true. + * The return value can be null which means the caller must skip this row. * * @return The next row */ - public InputRow nextRow(); + @Nullable + InputRow nextRow(); /** * Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is @@ -74,5 +77,5 @@ public interface Firehose extends Closeable * because of InputRows delivered by prior calls to ##nextRow(). *

*/ - public Runnable commit(); + Runnable commit(); } diff --git a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java index bbe1fc4d2288..a4b09a2010a9 100644 --- a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.CSVParser; import io.druid.java.util.common.parsers.Parser; @@ -35,26 +34,38 @@ public class CSVParseSpec extends ParseSpec { private final String listDelimiter; private final List columns; + private final boolean hasHeaderRow; + private final int skipHeaderRows; @JsonCreator public CSVParseSpec( @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { super(timestampSpec, dimensionsSpec); this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); - for (String column : columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - this.columns = columns; - - verify(dimensionsSpec.getDimensionNames()); + this.hasHeaderRow = hasHeaderRow; + this.skipHeaderRows = skipHeaderRows; + + if (columns != null) { + for (String column : columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); + } } @JsonProperty @@ -69,6 +80,18 @@ public List getColumns() return columns; } + @JsonProperty + public boolean isHasHeaderRow() + { + return hasHeaderRow; + } + + @JsonProperty("skipHeaderRows") + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + @Override public void verify(List usedCols) { @@ -80,23 +103,23 @@ public void verify(List usedCols) @Override public Parser makeParser() { - return new CSVParser(Optional.fromNullable(listDelimiter), columns); + return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow, skipHeaderRows); } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns); + return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow, skipHeaderRows); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns); + return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow, skipHeaderRows); } public ParseSpec withColumns(List cols) { - return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols); + return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow, skipHeaderRows); } } diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index 6d7096d7921f..c3383eb351c4 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.Parser; @@ -36,6 +35,8 @@ public class DelimitedParseSpec extends ParseSpec private final String delimiter; private final String listDelimiter; private final List columns; + private final boolean hasHeaderRow; + private final int skipHeaderRows; @JsonCreator public DelimitedParseSpec( @@ -43,20 +44,31 @@ public DelimitedParseSpec( @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { super(timestampSpec, dimensionsSpec); this.delimiter = delimiter; this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); this.columns = columns; - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + this.hasHeaderRow = hasHeaderRow; + this.skipHeaderRows = skipHeaderRows; + + if (columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header" + + " and hasHeaderRow must be set to true." + ); } - - verify(dimensionsSpec.getDimensionNames()); } @JsonProperty("delimiter") @@ -77,6 +89,18 @@ public List getColumns() return columns; } + @JsonProperty + public boolean isHasHeaderRow() + { + return hasHeaderRow; + } + + @JsonProperty("skipHeaderRows") + public int getSkipHeaderRows() + { + return skipHeaderRows; + } + @Override public void verify(List usedCols) { @@ -88,38 +112,79 @@ public void verify(List usedCols) @Override public Parser makeParser() { - Parser retVal = new DelimitedParser( + return new DelimitedParser( Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter) + Optional.fromNullable(listDelimiter), + columns, + hasHeaderRow, + skipHeaderRows ); - retVal.setFieldNames(columns); - return retVal; } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns); + return new DelimitedParseSpec( + spec, + getDimensionsSpec(), + delimiter, + listDelimiter, + columns, + hasHeaderRow, + skipHeaderRows + ); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns); + return new DelimitedParseSpec( + getTimestampSpec(), + spec, + delimiter, + listDelimiter, + columns, + hasHeaderRow, + skipHeaderRows + ); } public ParseSpec withDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns); + return new DelimitedParseSpec( + getTimestampSpec(), + getDimensionsSpec(), + delim, + listDelimiter, + columns, + hasHeaderRow, + skipHeaderRows + ); } public ParseSpec withListDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns); + return new DelimitedParseSpec( + getTimestampSpec(), + getDimensionsSpec(), + delimiter, + delim, + columns, + hasHeaderRow, + skipHeaderRows + ); } public ParseSpec withColumns(List cols) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols); + return new DelimitedParseSpec( + getTimestampSpec(), + getDimensionsSpec(), + delimiter, + listDelimiter, + cols, + hasHeaderRow, + skipHeaderRows + ); } } diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index 97e33f04a894..648831bece31 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -19,7 +19,6 @@ package io.druid.data.input.impl; -import com.google.common.base.Throwables; import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.utils.Runnables; @@ -27,6 +26,7 @@ import java.io.IOException; import java.util.Iterator; +import java.util.NoSuchElementException; /** */ @@ -50,7 +50,7 @@ public FileIteratingFirehose( public boolean hasMore() { while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { - lineIterator = lineIterators.next(); + lineIterator = getNextLineIterator(); } return lineIterator != null && lineIterator.hasNext(); @@ -59,21 +59,22 @@ public boolean hasMore() @Override public InputRow nextRow() { - try { - if (lineIterator == null || !lineIterator.hasNext()) { - // Close old streams, maybe. - if (lineIterator != null) { - lineIterator.close(); - } + if (!hasMore()) { + throw new NoSuchElementException(); + } - lineIterator = lineIterators.next(); - } + return parser.parse(lineIterator.next()); + } - return parser.parse(lineIterator.next()); - } - catch (Exception e) { - throw Throwables.propagate(e); + private LineIterator getNextLineIterator() + { + if (lineIterator != null) { + lineIterator.close(); } + + final LineIterator iterator = lineIterators.next(); + parser.startFileFromBeginning(); + return iterator; } @Override diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 6a13fcb7bd19..a640ef10ac41 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -22,12 +22,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; - import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.InputRow; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.common.parsers.Parser; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; @@ -124,18 +124,30 @@ private Map buildStringKeyMap(ByteBuffer input) return theMap; } - private Map parseString(String inputString) + public void startFileFromBeginning() { - return parser.parse(inputString); + parser.startFileFromBeginning(); } - public InputRow parse(String input) + @Nullable + public InputRow parse(@Nullable String input) { return parseMap(parseString(input)); } - private InputRow parseMap(Map theMap) + @Nullable + private Map parseString(@Nullable String inputString) { + return parser.parse(inputString); + } + + @Nullable + private InputRow parseMap(@Nullable Map theMap) + { + // If a header is present in the data (and with proper configurations), a null is returned + if (theMap == null) { + return null; + } return mapParser.parse(theMap); } } diff --git a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java index e6740cb63f4b..ec1fdb2a0402 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import io.druid.java.util.common.parsers.Parser; import java.util.List; diff --git a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java index 7d99a2b804de..e9143344c64b 100644 --- a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java @@ -41,7 +41,9 @@ public void testColumnMissing() throws Exception Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } @@ -60,7 +62,9 @@ public void testComma() throws Exception Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java index 1ebad8c64141..e94ba373fa67 100644 --- a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java @@ -40,7 +40,9 @@ public void testSerde() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("abc")), null, null), "\u0001", "\u0002", - Arrays.asList("abc") + Arrays.asList("abc"), + false, + 0 ); final DelimitedParseSpec serde = jsonMapper.readValue( jsonMapper.writeValueAsString(spec), @@ -71,7 +73,9 @@ public void testColumnMissing() throws Exception ), ",", " ", - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } @@ -91,12 +95,15 @@ public void testComma() throws Exception ), ",", null, - Arrays.asList("a") + Arrays.asList("a"), + false, + 0 ); } - @Test(expected = NullPointerException.class) - public void testDefaultColumnList(){ + @Test(expected = IllegalArgumentException.class) + public void testDefaultColumnList() + { final DelimitedParseSpec spec = new DelimitedParseSpec( new TimestampSpec( "timestamp", @@ -110,8 +117,9 @@ public void testDefaultColumnList(){ ), ",", null, - // pass null columns not allowed - null + null, + false, + 0 ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java index 87f5d20fcd5c..30239233bcbd 100644 --- a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java +++ b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java @@ -19,67 +19,107 @@ package io.druid.data.input.impl; -import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; - -import io.druid.java.util.common.Pair; -import junit.framework.Assert; - +import io.druid.data.input.InputRow; import org.apache.commons.io.LineIterator; +import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import java.io.IOException; import java.io.StringReader; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +@RunWith(Parameterized.class) public class FileIteratingFirehoseTest { - private static final List>> fixtures = ImmutableList.of( - Pair.of(new String[]{"2000,foo"}, ImmutableList.of("foo")), - Pair.of(new String[]{"2000,foo\n2000,bar\n"}, ImmutableList.of("foo", "bar")), - Pair.of(new String[]{"2000,foo\n2000,bar\n", "2000,baz"}, ImmutableList.of("foo", "bar", "baz")), - Pair.of(new String[]{"2000,foo\n2000,bar\n", "", "2000,baz"}, ImmutableList.of("foo", "bar", "baz")), - Pair.of(new String[]{"2000,foo\n2000,bar\n", "", "2000,baz", ""}, ImmutableList.of("foo", "bar", "baz")), - Pair.of(new String[]{""}, ImmutableList.of()), - Pair.of(new String[]{}, ImmutableList.of()) - ); + @Parameters(name = "{0}, {1}") + public static Collection constructorFeeder() throws IOException + { + final List> inputTexts = ImmutableList.of( + ImmutableList.of("2000,foo"), + ImmutableList.of("2000,foo\n2000,bar\n"), + ImmutableList.of("2000,foo\n2000,bar\n", "2000,baz"), + ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz"), + ImmutableList.of("2000,foo\n2000,bar\n", "", "2000,baz", ""), + ImmutableList.of("2000,foo\n2000,bar\n2000,baz", "", "2000,baz", "2000,foo\n2000,bar\n3000,baz"), + ImmutableList.of(""), + ImmutableList.of() + ); + + final List args = new ArrayList<>(); + for (int numSkipHeadRows = 0; numSkipHeadRows < 3; numSkipHeadRows++) { + for (List texts : inputTexts) { + args.add(new Object[] {texts, numSkipHeadRows}); + } + } + + return args; + } + + private final StringInputRowParser parser; + private final List inputs; + private final List expectedResults; + + public FileIteratingFirehoseTest(List texts, int numSkipHeaderRows) + { + parser = new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null), + ",", + ImmutableList.of("ts", "x"), + false, + numSkipHeaderRows + ), + null + ); + + this.inputs = texts; + this.expectedResults = inputs.stream() + .map(input -> input.split("\n")) + .flatMap(lines -> { + final List filteredLines = Arrays.asList(lines).stream() + .filter(line -> line.length() > 0) + .map(line -> line.split(",")[1]) + .collect(Collectors.toList()); + + final int numRealSkippedRows = Math.min(filteredLines.size(), numSkipHeaderRows); + IntStream.range(0, numRealSkippedRows).forEach(i -> filteredLines.set(i, null)); + return filteredLines.stream(); + }) + .collect(Collectors.toList()); + } @Test public void testFirehose() throws Exception { - for (Pair> fixture : fixtures) { - final List lineIterators = Lists.transform( - Arrays.asList(fixture.lhs), - new Function() - { - @Override - public LineIterator apply(String s) - { - return new LineIterator(new StringReader(s)); - } - } - ); - - final StringInputRowParser parser = new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("ts", "auto", null), - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null), - ",", - ImmutableList.of("ts", "x") - ), - null - ); + final List lineIterators = inputs.stream() + .map(s -> new LineIterator(new StringReader(s))) + .collect(Collectors.toList()); - final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser); + try (final FileIteratingFirehose firehose = new FileIteratingFirehose(lineIterators.iterator(), parser)) { final List results = Lists.newArrayList(); while (firehose.hasMore()) { - results.add(Joiner.on("|").join(firehose.nextRow().getDimension("x"))); + final InputRow inputRow = firehose.nextRow(); + if (inputRow == null) { + results.add(null); + } else { + results.add(Joiner.on("|").join(inputRow.getDimension("x"))); + } } - Assert.assertEquals(fixture.rhs, results); + Assert.assertEquals(expectedResults, results); } } } diff --git a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java index 4b38453fe35f..01da54cd0fd0 100644 --- a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java @@ -45,7 +45,9 @@ public void testDuplicateNames() throws Exception ), ",", " ", - Arrays.asList("a", "b") + Arrays.asList("a", "b"), + false, + 0 ); } @@ -65,7 +67,9 @@ public void testDimAndDimExcluOverlap() throws Exception ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false, + 0 ); } @@ -85,7 +89,9 @@ public void testDimExclusionDuplicate() throws Exception ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false, + 0 ); } } diff --git a/docs/content/development/extensions-core/lookups-cached-global.md b/docs/content/development/extensions-core/lookups-cached-global.md index 72d346201c1d..385cc07ad7cd 100644 --- a/docs/content/development/extensions-core/lookups-cached-global.md +++ b/docs/content/development/extensions-core/lookups-cached-global.md @@ -195,12 +195,17 @@ The `namespaceParseSpec` can be one of a number of values. Each of the examples Only ONE file which matches the search will be used. For most implementations, the discriminator for choosing the URIs is by whichever one reports the most recent timestamp for its modification time. ### csv lookupParseSpec - |Parameter|Description|Required|Default| |---------|-----------|--------|-------| -|`columns`|The list of columns in the csv file|yes|`null`| +|`columns`|The list of columns in the csv file|no if `hasHeaderRow` is set|`null`| |`keyColumn`|The name of the column containing the key|no|The first column| |`valueColumn`|The name of the column containing the value|no|The second column| +|`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false| +|`skipHeaderRows`|Number of header rows to be skipped|no|0| + +If both `skipHeaderRows` and `hasHeaderRow` options are set, `skipHeaderRows` is first applied. For example, if you set +`skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will skip the first two lines and then extract column information +from the third line. *example input* @@ -222,15 +227,19 @@ truck,something3,buck ``` ### tsv lookupParseSpec - |Parameter|Description|Required|Default| |---------|-----------|--------|-------| -|`columns`|The list of columns in the csv file|yes|`null`| +|`columns`|The list of columns in the tsv file|yes|`null`| |`keyColumn`|The name of the column containing the key|no|The first column| |`valueColumn`|The name of the column containing the value|no|The second column| |`delimiter`|The delimiter in the file|no|tab (`\t`)| |`listDelimiter`|The list delimiter in the file|no| (`\u0001`)| +|`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false| +|`skipHeaderRows`|Number of header rows to be skipped|no|0| +If both `skipHeaderRows` and `hasHeaderRow` options are set, `skipHeaderRows` is first applied. For example, if you set +`skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will skip the first two lines and then extract column information +from the third line. *example input* diff --git a/docs/content/ingestion/data-formats.md b/docs/content/ingestion/data-formats.md index 95807cc60a6a..859f75e12be8 100644 --- a/docs/content/ingestion/data-formats.md +++ b/docs/content/ingestion/data-formats.md @@ -72,10 +72,8 @@ If you have nested JSON, [Druid can automatically flatten it for you](flatten-js ### CSV -Since the CSV data cannot contain the column names (no header is allowed), these must be added before that data can be processed: - ```json - "parseSpec":{ + "parseSpec": { "format" : "csv", "timestampSpec" : { "column" : "timestamp" @@ -87,12 +85,27 @@ Since the CSV data cannot contain the column names (no header is allowed), these } ``` -The `columns` field must match the columns of your input data in the same order. +#### CSV Index Tasks + +If your input files contain a header, the `columns` field is optional and you don't need to set. +Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header. +Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. + +Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set, +`skipHeaderRows` is first applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will +skip the first two lines and then extract column information from the third line. -### TSV +Note that `hasHeaderRow` and `skipHeaderRows` are effective only for non-Hadoop batch index tasks. Other types of index +tasks will fail with an exception. + +#### Other CSV Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + +### TSV (Delimited) ```json - "parseSpec":{ + "parseSpec": { "format" : "tsv", "timestampSpec" : { "column" : "timestamp" @@ -105,10 +118,25 @@ The `columns` field must match the columns of your input data in the same order. } ``` -The `columns` field must match the columns of your input data in the same order. - Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed. +#### TSV (Delimited) Index Tasks + +If your input files contain a header, the `columns` field is optional and you don't need to set. +Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header. +Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. + +Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set, +`skipHeaderRows` is first applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will +skip the first two lines and then extract column information from the third line. + +Note that `hasHeaderRow` and `skipHeaderRows` are effective only for non-Hadoop batch index tasks. Other types of index +tasks will fail with an exception. + +#### Other TSV (Delimited) Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + ### Regex ```json diff --git a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java index 46d99cbbcc96..d9a9966dce21 100644 --- a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java @@ -96,7 +96,9 @@ public static Iterable constructorFeeder() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dim", "keys", "values")), null, null), "\t", ",", - Arrays.asList("ts", "dim", "keys", "values") + Arrays.asList("ts", "dim", "keys", "values"), + false, + 0 ) , "utf8" ); diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java index a7c27c66638e..1c6bfd0447ef 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java @@ -27,13 +27,13 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import io.druid.guice.annotations.Json; import io.druid.java.util.common.IAE; import io.druid.java.util.common.UOE; @@ -41,7 +41,6 @@ import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.JSONParser; import io.druid.java.util.common.parsers.Parser; - import org.joda.time.Period; import javax.annotation.Nullable; @@ -264,7 +263,9 @@ public static class CSVFlatDataParser implements FlatDataParser public CSVFlatDataParser( @JsonProperty("columns") List columns, @JsonProperty("keyColumn") final String keyColumn, - @JsonProperty("valueColumn") final String valueColumn + @JsonProperty("valueColumn") final String valueColumn, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { Preconditions.checkArgument( @@ -293,12 +294,22 @@ public CSVFlatDataParser( ); this.parser = new DelegateParser( - new CSVParser(Optional.absent(), columns), + new CSVParser(Optional.absent(), columns, hasHeaderRow, skipHeaderRows), this.keyColumn, this.valueColumn ); } + @VisibleForTesting + CSVFlatDataParser( + List columns, + String keyColumn, + String valueColumn + ) + { + this(columns, keyColumn, valueColumn, false, 0); + } + @JsonProperty public List getColumns() { @@ -373,7 +384,9 @@ public TSVFlatDataParser( @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, @JsonProperty("keyColumn") final String keyColumn, - @JsonProperty("valueColumn") final String valueColumn + @JsonProperty("valueColumn") final String valueColumn, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow, + @JsonProperty("skipHeaderRows") int skipHeaderRows ) { Preconditions.checkArgument( @@ -382,7 +395,9 @@ public TSVFlatDataParser( ); final DelimitedParser delegate = new DelimitedParser( Optional.fromNullable(Strings.emptyToNull(delimiter)), - Optional.fromNullable(Strings.emptyToNull(listDelimiter)) + Optional.fromNullable(Strings.emptyToNull(listDelimiter)), + hasHeaderRow, + skipHeaderRows ); Preconditions.checkArgument( !(Strings.isNullOrEmpty(keyColumn) ^ Strings.isNullOrEmpty(valueColumn)), @@ -410,6 +425,18 @@ public TSVFlatDataParser( this.parser = new DelegateParser(delegate, this.keyColumn, this.valueColumn); } + @VisibleForTesting + TSVFlatDataParser( + List columns, + String delimiter, + String listDelimiter, + String keyColumn, + String valueColumn + ) + { + this(columns, delimiter, listDelimiter, keyColumn, valueColumn, false, 0); + } + @JsonProperty public List getColumns() { diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 7201bd119133..2746dd9c004e 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -346,7 +346,9 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map data(){ + public static Collection data() + { int[] first = new int[1]; Arrays.fill(first, 13); int[] second = new int[6]; @@ -116,7 +117,12 @@ public DetermineHashedPartitionsJobTest( new DelimitedParseSpec( new TimestampSpec("ts", null, null), new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("market", "quality", "placement", "placementish")), + DimensionsSpec.getDefaultSchemas(ImmutableList.of( + "market", + "quality", + "placement", + "placementish" + )), null, null ), @@ -129,7 +135,9 @@ public DetermineHashedPartitionsJobTest( "placement", "placementish", "index" - ) + ), + false, + 0 ), null ), @@ -176,7 +184,8 @@ public DetermineHashedPartitionsJobTest( } @Test - public void testDetermineHashedPartitions(){ + public void testDetermineHashedPartitions() + { DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig); determineHashedPartitionsJob.run(); Map> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs(); @@ -184,8 +193,8 @@ public void testDetermineHashedPartitions(){ expectedNumTimeBuckets, shardSpecs.entrySet().size() ); - int i=0; - for(Map.Entry> entry : shardSpecs.entrySet()) { + int i = 0; + for (Map.Entry> entry : shardSpecs.entrySet()) { Assert.assertEquals( expectedNumOfShards[i++], entry.getValue().size(), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 9fc2e8eb0400..bf3c1c8713fe 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -227,7 +227,9 @@ public DeterminePartitionsJobTest( new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null), null, - ImmutableList.of("timestamp", "host", "country", "visited_num") + ImmutableList.of("timestamp", "host", "country", "visited_num"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 250b9dd8b045..07edcd9e32a4 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -142,7 +142,9 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ), null ), @@ -188,7 +190,9 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ) ), null, @@ -233,7 +237,9 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ), null ), @@ -289,7 +295,9 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ) ), null, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 8af6c470ff69..17bda8f4bcb7 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -77,7 +77,9 @@ public void setup() throws Exception new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index f1c7d558c909..a8cb0216705e 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -265,7 +265,9 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(null, null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false, + 0 ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 4b9adba54c4f..f7ff27c9c3a2 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -164,7 +164,9 @@ public InputStream openStream() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null), "\t", "\u0001", - Arrays.asList(TestIndex.COLUMNS) + Arrays.asList(TestIndex.COLUMNS), + false, + 0 ), null ), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 75a86d8b84bb..fd36470b111d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -272,6 +272,11 @@ private Map> determineShardSpecs( while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); + // The null inputRow means the caller must skip this row. + if (inputRow == null) { + continue; + } + final Interval interval; if (determineIntervals) { interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 81e85c70dba1..3ed1942af863 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; @@ -74,6 +75,24 @@ public class IndexTaskTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val"), + false, + 0 + ); + private final IndexSpec indexSpec; private final ObjectMapper jsonMapper; private IndexMerger indexMerger; @@ -107,7 +126,7 @@ public void testDeterminePartitions() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, false), + createIngestionSpec(tmpDir, null, null, 2, null, false, false), null, jsonMapper ); @@ -145,7 +164,7 @@ public void testForceExtendableShardSpecs() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, true, false), + createIngestionSpec(tmpDir, null, null, 2, null, true, false), null, jsonMapper ); @@ -185,6 +204,7 @@ public void testWithArbitraryGranularity() throws Exception null, createIngestionSpec( tmpDir, + null, new ArbitraryGranularitySpec( Granularities.MINUTE, Arrays.asList(new Interval("2014/2015")) @@ -220,6 +240,7 @@ public void testIntervalBucketing() throws Exception null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.HOUR, @@ -254,7 +275,7 @@ public void testNumShardsProvided() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 1, false, false), + createIngestionSpec(tmpDir, null, null, null, 1, false, false), null, jsonMapper ); @@ -285,7 +306,7 @@ public void testAppendToExisting() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, true), + createIngestionSpec(tmpDir, null, null, 2, null, false, true), null, jsonMapper ); @@ -323,6 +344,7 @@ public void testIntervalNotSpecified() throws Exception null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, @@ -357,6 +379,112 @@ public void testIntervalNotSpecified() throws Exception Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); } + @Test + public void testCSVFileWithHeader() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + null, + true, + 0 + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + + @Test + public void testCSVFileWithHeaderColumnOverride() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true, + 0 + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("dim"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -434,6 +562,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException private IndexTask.IndexIngestionSpec createIngestionSpec( File baseDir, + ParseSpec parseSpec, GranularitySpec granularitySpec, Integer targetPartitionSize, Integer numShards, @@ -446,20 +575,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( "test", jsonMapper.convertValue( new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), + parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, null ), Map.class diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index e45ed7e6acc7..957e72560d2a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -19,6 +19,7 @@ package io.druid.java.util.common.parsers; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Splitter; @@ -33,24 +34,17 @@ public class CSVParser implements Parser { - private final String listDelimiter; - private final Splitter listSplitter; - private final Function valueFunction; - - private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); - - private ArrayList fieldNames = null; - - public CSVParser(final Optional listDelimiter) + private static final Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) { - this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; - this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() + return new Function() { @Override public Object apply(String input) { - if (input.contains(CSVParser.this.listDelimiter)) { + if (input.contains(listDelimiter)) { return Lists.newArrayList( Iterables.transform( listSplitter.split(input), @@ -64,16 +58,49 @@ public Object apply(String input) }; } - public CSVParser(final Optional listDelimiter, final Iterable fieldNames) + private final String listDelimiter; + private final Splitter listSplitter; + private final Function valueFunction; + private final boolean hasHeaderRow; + private final int maxSkipHeaderRows; + + private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); + + private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; + private int skippedHeaderRows; + private boolean supportSkipHeaderRows; + + public CSVParser( + final Optional listDelimiter, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) { - this(listDelimiter); + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + + this.hasHeaderRow = hasHeaderRow; + this.maxSkipHeaderRows = maxSkipHeaderRows; + } + + public CSVParser( + final Optional listDelimiter, + final Iterable fieldNames, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) + { + this(listDelimiter, hasHeaderRow, maxSkipHeaderRows); setFieldNames(fieldNames); } - public CSVParser(final Optional listDelimiter, final String header) + @VisibleForTesting + CSVParser(final Optional listDelimiter, final String header) { - this(listDelimiter); + this(listDelimiter, false, 0); setFieldNames(header); } @@ -83,6 +110,14 @@ public String getListDelimiter() return listDelimiter; } + @Override + public void startFileFromBeginning() + { + supportSkipHeaderRows = true; + hasParsedHeader = false; + skippedHeaderRows = 0; + } + @Override public List getFieldNames() { @@ -92,8 +127,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(final String header) @@ -109,9 +146,27 @@ public void setFieldNames(final String header) @Override public Map parse(final String input) { + if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { + throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " + + "Please check the indexTask supports these options."); + } + try { String[] values = parser.parseLine(input); + if (skippedHeaderRows < maxSkipHeaderRows) { + skippedHeaderRows++; + return null; + } + + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(Arrays.asList(values)); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(values.length)); } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index c37c4bda989c..27114df31e95 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -19,6 +19,7 @@ package io.druid.java.util.common.parsers; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -36,16 +37,44 @@ public class DelimitedParser implements Parser { private static final String DEFAULT_DELIMITER = "\t"; + private static Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) + { + return (input) -> { + if (input.contains(listDelimiter)) { + return Lists.newArrayList( + Iterables.transform( + listSplitter.split(input), + ParserUtils.nullEmptyStringFunction + ) + ); + } else { + return ParserUtils.nullEmptyStringFunction.apply(input); + } + }; + } + private final String delimiter; private final String listDelimiter; - private final Splitter splitter; private final Splitter listSplitter; private final Function valueFunction; + private final boolean hasHeaderRow; + private final int maxSkipHeaderRows; private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; + private int skippedHeaderRows; + private boolean supportSkipHeaderRows; - public DelimitedParser(final Optional delimiter, Optional listDelimiter) + public DelimitedParser( + final Optional delimiter, + final Optional listDelimiter, + final boolean hasHeaderRow, + final int maxSkipHeaderRows + ) { this.delimiter = delimiter.isPresent() ? delimiter.get() : DEFAULT_DELIMITER; this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; @@ -58,40 +87,28 @@ public DelimitedParser(final Optional delimiter, Optional listDe this.splitter = Splitter.on(this.delimiter); this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() - { - @Override - public Object apply(String input) - { - if (input.contains(DelimitedParser.this.listDelimiter)) { - return Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); - } else { - return ParserUtils.nullEmptyStringFunction.apply(input); - } - } - }; + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = hasHeaderRow; + this.maxSkipHeaderRows = maxSkipHeaderRows; } public DelimitedParser( final Optional delimiter, final Optional listDelimiter, - final Iterable fieldNames + final Iterable fieldNames, + final boolean hasHeaderRow, + final int maxSkipHeaderRows ) { - this(delimiter, listDelimiter); + this(delimiter, listDelimiter, hasHeaderRow, maxSkipHeaderRows); setFieldNames(fieldNames); } - public DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) - + @VisibleForTesting + DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) { - this(delimiter, listDelimiter); + this(delimiter, listDelimiter, false, 0); setFieldNames(header); } @@ -106,6 +123,14 @@ public String getListDelimiter() return listDelimiter; } + @Override + public void startFileFromBeginning() + { + supportSkipHeaderRows = true; + hasParsedHeader = false; + skippedHeaderRows = 0; + } + @Override public List getFieldNames() { @@ -115,8 +140,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(String header) @@ -132,9 +159,27 @@ public void setFieldNames(String header) @Override public Map parse(final String input) { + if (!supportSkipHeaderRows && (hasHeaderRow || maxSkipHeaderRows > 0)) { + throw new UnsupportedOperationException("hasHeaderRow or maxSkipHeaderRows is not supported. " + + "Please check the indexTask supports these options."); + } + try { Iterable values = splitter.split(input); + if (skippedHeaderRows < maxSkipHeaderRows) { + skippedHeaderRows++; + return null; + } + + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(values); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(Iterators.size(values.iterator()))); } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java index 2717d2db915a..d23d599d3bca 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java @@ -238,7 +238,7 @@ public enum FieldType /** * Specifies a field to be added to the parsed object Map, using JsonPath notation. - * + *

* See https://github.com/jayway/JsonPath for more information. */ public static class FieldSpec @@ -281,5 +281,4 @@ public String getExpr() return expr; } } - } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java index 78b29de72244..8cc6fd6a1d6b 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java @@ -19,6 +19,7 @@ package io.druid.java.util.common.parsers; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -28,11 +29,21 @@ public interface Parser { /** - * Parse a String into a Map. + * This method may or may not get called at the start of reading of every file depending on the type of IndexTasks. + * The parser state should be reset if exists. + */ + default void startFileFromBeginning() + { + + } + + /** + * Parse a String into a Map. The result can be null which means the given input string will be ignored. * * @throws ParseException if the String cannot be parsed */ - public Map parse(String input); + @Nullable + Map parse(String input); /** * Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be @@ -40,12 +51,12 @@ public interface Parser * parser) and those parsers have their own way of setting field names. */ @Deprecated - public void setFieldNames(Iterable fieldNames); + void setFieldNames(Iterable fieldNames); /** * Returns the fieldNames that we expect to see in parsed Maps, if known, or null otherwise. Deprecated; Parsers * should not, in general, be expected to know what fields they will return. */ @Deprecated - public List getFieldNames(); + List getFieldNames(); } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java index 87b6321aa20f..329b02aa944a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java @@ -117,7 +117,6 @@ public void setFieldNames(Iterable fieldNames) } @Override - public List getFieldNames() { return fieldNames; diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java index 7e45bf23d941..fede1aa1f98c 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java @@ -53,6 +53,12 @@ public Map parse(String input) return retVal; } + @Override + public void startFileFromBeginning() + { + baseParser.startFileFromBeginning(); + } + @Override public void setFieldNames(Iterable fieldNames) { diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java index 8121fd9fd0c7..37a589b276a2 100644 --- a/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/parsers/CSVParserTest.java @@ -21,7 +21,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Test; import java.util.Map; @@ -80,7 +80,7 @@ public void testCSVParserWithHeader() @Test public void testCSVParserWithoutHeader() { - final Parser csvParser = new CSVParser(Optional.fromNullable(null)); + final Parser csvParser = new CSVParser(Optional.fromNullable(null), false, 0); String body = "hello,world,foo"; final Map jsonMap = csvParser.parse(body); Assert.assertEquals( @@ -89,4 +89,48 @@ public void testCSVParserWithoutHeader() jsonMap ); } + + @Test + public void testCSVParserWithSkipHeaderRows() + { + final int skipHeaderRows = 2; + final Parser csvParser = new CSVParser( + Optional.absent(), + false, + skipHeaderRows + ); + csvParser.startFileFromBeginning(); + final String[] body = new String[] { + "header,line,1", + "header,line,2", + "hello,world,foo" + }; + int index; + for (index = 0; index < skipHeaderRows; index++) { + Assert.assertNull(csvParser.parse(body[index])); + } + final Map jsonMap = csvParser.parse(body[index]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), + jsonMap + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testCSVParserWithoutStartFileFromBeginning() + { + final int skipHeaderRows = 2; + final Parser csvParser = new CSVParser( + Optional.absent(), + false, + skipHeaderRows + ); + final String[] body = new String[] { + "header\tline\t1", + "header\tline\t2", + "hello\tworld\tfoo" + }; + csvParser.parse(body[0]); + } } diff --git a/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java b/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java index 3ca58c67f671..d91ed25cbbc2 100644 --- a/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/parsers/DelimitedParserTest.java @@ -21,7 +21,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Test; import java.util.Map; @@ -67,7 +67,11 @@ public void testInvalidHeader() public void testTSVParserWithHeader() { String header = "time\tvalue1\tvalue2"; - final Parser delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.absent(), header); + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + header + ); String body = "hello\tworld\tfoo"; final Map jsonMap = delimitedParser.parse(body); Assert.assertEquals( @@ -80,7 +84,12 @@ public void testTSVParserWithHeader() @Test public void testTSVParserWithoutHeader() { - final Parser delimitedParser = new DelimitedParser(Optional.of("\t"), Optional.absent()); + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + false, + 0 + ); String body = "hello\tworld\tfoo"; final Map jsonMap = delimitedParser.parse(body); Assert.assertEquals( @@ -89,4 +98,50 @@ public void testTSVParserWithoutHeader() jsonMap ); } + + @Test + public void testTSVParserWithSkipHeaderRows() + { + final int skipHeaderRows = 2; + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + false, + skipHeaderRows + ); + delimitedParser.startFileFromBeginning(); + final String[] body = new String[] { + "header\tline\t1", + "header\tline\t2", + "hello\tworld\tfoo" + }; + int index; + for (index = 0; index < skipHeaderRows; index++) { + Assert.assertNull(delimitedParser.parse(body[index])); + } + final Map jsonMap = delimitedParser.parse(body[index]); + Assert.assertEquals( + "jsonMap", + ImmutableMap.of("column_1", "hello", "column_2", "world", "column_3", "foo"), + jsonMap + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testTSVParserWithoutStartFileFromBeginning() + { + final int skipHeaderRows = 2; + final Parser delimitedParser = new DelimitedParser( + Optional.of("\t"), + Optional.absent(), + false, + skipHeaderRows + ); + final String[] body = new String[] { + "header\tline\t1", + "header\tline\t2", + "hello\tworld\tfoo" + }; + delimitedParser.parse(body[0]); + } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index d4fb1223ec82..614d45ff18b0 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -130,7 +130,9 @@ public static void setupClass() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false, + 0 ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index 26df8376601a..edda6b9db3cd 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -146,7 +146,9 @@ private Segment createSegment() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false, + 0 ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 6f5297684919..7a29b9248604 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -291,7 +291,9 @@ public static IncrementalIndex loadIncrementalIndex( new DimensionsSpec(DIMENSION_SCHEMAS, null, null), "\t", "\u0001", - Arrays.asList(COLUMNS) + Arrays.asList(COLUMNS), + false, + 0 ) , "utf8" ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java index 53a586ce4f28..08cd87ac0893 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java @@ -162,6 +162,16 @@ public ReplayableFirehose(InputRowParser parser) throws IOException while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) { try { InputRow row = delegateFirehose.nextRow(); + + // delegateFirehose may return a null row if the underlying parser returns null. + // This should be written, but deserialization of null values causes an error of + // 'com.fasterxml.jackson.databind.RuntimeJsonMappingException: Can not deserialize instance of io.druid.data.input.MapBasedRow out of VALUE_NULL token' + // Since ReplayableFirehoseFactory is planed to be removed in https://github.com/druid-io/druid/pull/4193, + // we simply skip null rows for now. + if (row == null) { + continue; + } + generator.writeObject(row); dimensionScratch.addAll(row.getDimensions()); counter++; diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index ea8b5590701a..64577a8a8276 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -108,7 +108,9 @@ private void createTestIndex(File segmentDir) throws Exception new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false, + 0 ), Charsets.UTF_8.toString() );