From 7bd60777f47d282f7c8adb368e1cc06ad92b7fbc Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 20 Apr 2017 16:25:51 -0700 Subject: [PATCH 1/9] initial commit --- .../druid/data/input/impl/CSVParseSpec.java | 42 +++-- .../data/input/impl/DelimitedParseSpec.java | 58 +++++-- .../input/impl/FileIteratingFirehose.java | 1 + .../data/input/impl/StringInputRowParser.java | 17 +- .../data/input/impl/TimeAndDimsParseSpec.java | 6 + .../data/input/impl/CSVParseSpecTest.java | 6 +- .../input/impl/DelimitedParseSpecTest.java | 15 +- .../input/impl/FileIteratingFirehoseTest.java | 3 +- .../druid/data/input/impl/ParseSpecTest.java | 9 +- docs/content/ingestion/data-formats.md | 7 +- .../druid/segment/MapVirtualColumnTest.java | 3 +- .../namespace/URIExtractionNamespace.java | 12 ++ .../indexer/BatchDeltaIngestionTest.java | 3 +- .../DetermineHashedPartitionsJobTest.java | 29 ++-- .../indexer/DeterminePartitionsJobTest.java | 3 +- .../druid/indexer/IndexGeneratorJobTest.java | 12 +- .../java/io/druid/indexer/JobHelperTest.java | 3 +- .../indexer/path/DatasourcePathSpecTest.java | 3 +- .../updater/HadoopConverterJobTest.java | 3 +- .../druid/indexing/common/task/IndexTask.java | 4 + .../indexing/common/task/IndexTaskTest.java | 149 +++++++++++++++--- .../java/util/common/parsers/CSVParser.java | 34 +++- .../util/common/parsers/DelimitedParser.java | 29 +++- .../java/util/common/parsers/JSONParser.java | 6 + .../util/common/parsers/JSONPathParser.java | 5 + .../util/common/parsers/JavaScriptParser.java | 6 + .../java/util/common/parsers/Parser.java | 5 + .../java/util/common/parsers/RegexParser.java | 7 +- .../common/parsers/ToLowerCaseParser.java | 6 + .../druid/query/MultiValuedDimensionTest.java | 3 +- .../GroupByQueryRunnerFactoryTest.java | 3 +- .../test/java/io/druid/segment/TestIndex.java | 3 +- .../firehose/ReplayableFirehoseFactory.java | 5 + .../firehose/IngestSegmentFirehoseTest.java | 3 +- 34 files changed, 412 insertions(+), 91 deletions(-) diff --git a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java index bbe1fc4d2288..e8b2788db53a 100644 --- a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.CSVParser; import io.druid.java.util.common.parsers.Parser; @@ -35,26 +34,35 @@ public class CSVParseSpec extends ParseSpec { private final String listDelimiter; private final List columns; + private final boolean firstRowIsHeader; @JsonCreator public CSVParseSpec( @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("firstRowIsHeader") boolean firstRowIsHeader ) { super(timestampSpec, dimensionsSpec); this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); - for (String column : columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - this.columns = columns; - verify(dimensionsSpec.getDimensionNames()); + if (columns != null) { + for (String column : columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + firstRowIsHeader, + "If columns field is not set, the first row of your data must have your header and firstRowIsHeader must be set to true." + ); + } + + this.firstRowIsHeader = firstRowIsHeader; } @JsonProperty @@ -69,6 +77,12 @@ public List getColumns() return columns; } + @JsonProperty + public boolean isFirstRowIsHeader() + { + return firstRowIsHeader; + } + @Override public void verify(List usedCols) { @@ -80,23 +94,27 @@ public void verify(List usedCols) @Override public Parser makeParser() { - return new CSVParser(Optional.fromNullable(listDelimiter), columns); + if (firstRowIsHeader) { + return new CSVParser(Optional.fromNullable(listDelimiter), columns, firstRowIsHeader); + } else { + return new CSVParser(Optional.fromNullable(listDelimiter), columns); + } } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns); + return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, firstRowIsHeader); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns); + return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, firstRowIsHeader); } public ParseSpec withColumns(List cols) { - return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols); + return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, firstRowIsHeader); } } diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index 6d7096d7921f..c0c01520634d 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.Parser; @@ -36,6 +35,7 @@ public class DelimitedParseSpec extends ParseSpec private final String delimiter; private final String listDelimiter; private final List columns; + private final boolean firstRowIsHeader; @JsonCreator public DelimitedParseSpec( @@ -43,20 +43,28 @@ public DelimitedParseSpec( @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("firstRowIsHeader") boolean firstRowIsHeader ) { super(timestampSpec, dimensionsSpec); this.delimiter = delimiter; this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); this.columns = columns; - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + if (columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + firstRowIsHeader, + "If columns field is not set, the first row of your data must have your header and firstRowIsHeader must be set to true." + ); } - verify(dimensionsSpec.getDimensionNames()); + this.firstRowIsHeader = firstRowIsHeader; } @JsonProperty("delimiter") @@ -77,6 +85,12 @@ public List getColumns() return columns; } + @JsonProperty + public boolean isFirstRowIsHeader() + { + return firstRowIsHeader; + } + @Override public void verify(List usedCols) { @@ -88,38 +102,48 @@ public void verify(List usedCols) @Override public Parser makeParser() { - Parser retVal = new DelimitedParser( - Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter) - ); - retVal.setFieldNames(columns); - return retVal; + if (firstRowIsHeader) { + return new DelimitedParser( + Optional.fromNullable(delimiter), + Optional.fromNullable(listDelimiter), + firstRowIsHeader + ); + } else { + return new DelimitedParser( + Optional.fromNullable(delimiter), + Optional.fromNullable(listDelimiter) + ); + } } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns); + return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns, firstRowIsHeader); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns); + return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns, firstRowIsHeader); } public ParseSpec withDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns, + firstRowIsHeader + ); } public ParseSpec withListDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns, firstRowIsHeader); } public ParseSpec withColumns(List cols) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols, + firstRowIsHeader + ); } } diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index 97e33f04a894..cebf0e018e99 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -51,6 +51,7 @@ public boolean hasMore() { while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { lineIterator = lineIterators.next(); + parser.reset(); } return lineIterator != null && lineIterator.hasNext(); diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 6a13fcb7bd19..58baeed5b7ca 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; - import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.InputRow; import io.druid.java.util.common.parsers.ParseException; @@ -33,6 +32,7 @@ import java.nio.charset.Charset; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -124,18 +124,27 @@ private Map buildStringKeyMap(ByteBuffer input) return theMap; } - private Map parseString(String inputString) + public void reset() { - return parser.parse(inputString); + parser.reset(); } public InputRow parse(String input) + { + return parseMap(parseString(input)); + } + + private Map parseString(String inputString) { - return parseMap(parseString(input)); + return parser.parse(inputString); } private InputRow parseMap(Map theMap) { + // if the file has a header, null is returned + if (theMap == null) { + return null; + } return mapParser.parse(theMap); } } diff --git a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java index e6740cb63f4b..2789537d1574 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java @@ -64,6 +64,12 @@ public List getFieldNames() { throw new UnsupportedOperationException("not supported"); } + + @Override + public void reset() + { + // do nothing + } }; } diff --git a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java index 7d99a2b804de..520f98bc03fb 100644 --- a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java @@ -41,7 +41,8 @@ public void testColumnMissing() throws Exception Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false ); } @@ -60,7 +61,8 @@ public void testComma() throws Exception Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java index 1ebad8c64141..3cadfee4ff25 100644 --- a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java @@ -40,7 +40,8 @@ public void testSerde() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("abc")), null, null), "\u0001", "\u0002", - Arrays.asList("abc") + Arrays.asList("abc"), + false ); final DelimitedParseSpec serde = jsonMapper.readValue( jsonMapper.writeValueAsString(spec), @@ -71,7 +72,8 @@ public void testColumnMissing() throws Exception ), ",", " ", - Arrays.asList("a") + Arrays.asList("a"), + false ); } @@ -91,12 +93,14 @@ public void testComma() throws Exception ), ",", null, - Arrays.asList("a") + Arrays.asList("a"), + false ); } @Test(expected = NullPointerException.class) - public void testDefaultColumnList(){ + public void testDefaultColumnList() + { final DelimitedParseSpec spec = new DelimitedParseSpec( new TimestampSpec( "timestamp", @@ -111,7 +115,8 @@ public void testDefaultColumnList(){ ",", null, // pass null columns not allowed - null + null, + false ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java index 87f5d20fcd5c..dc5df609c28a 100644 --- a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java +++ b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java @@ -67,7 +67,8 @@ public LineIterator apply(String s) new TimestampSpec("ts", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null), ",", - ImmutableList.of("ts", "x") + ImmutableList.of("ts", "x"), + false ), null ); diff --git a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java index 4b38453fe35f..593934080cca 100644 --- a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java @@ -45,7 +45,8 @@ public void testDuplicateNames() throws Exception ), ",", " ", - Arrays.asList("a", "b") + Arrays.asList("a", "b"), + false ); } @@ -65,7 +66,8 @@ public void testDimAndDimExcluOverlap() throws Exception ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false ); } @@ -85,7 +87,8 @@ public void testDimExclusionDuplicate() throws Exception ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false ); } } diff --git a/docs/content/ingestion/data-formats.md b/docs/content/ingestion/data-formats.md index 95807cc60a6a..eef8d32c2d42 100644 --- a/docs/content/ingestion/data-formats.md +++ b/docs/content/ingestion/data-formats.md @@ -81,13 +81,15 @@ Since the CSV data cannot contain the column names (no header is allowed), these "column" : "timestamp" }, "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"], + "firstRowIsHeader" : "false", "dimensionsSpec" : { "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"] } } ``` -The `columns` field must match the columns of your input data in the same order. +If your file does not have a header as the first line of the file, you must set the `columns` field and ensure that the order of the fields matches the columns of your input data in the same order. +If your file does have a header, set `firstRowIsHeader` to true, and do not include the `columns` key. ### TSV @@ -105,7 +107,8 @@ The `columns` field must match the columns of your input data in the same order. } ``` -The `columns` field must match the columns of your input data in the same order. +If your file does not have a header as the first line of the file, you must set the `columns` field and ensure that the order of the fields matches the columns of your input data in the same order. +If your file does have a header, set `firstRowIsHeader` to true, and do not include the `columns` key. Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed. diff --git a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java index 46d99cbbcc96..3450f11bfada 100644 --- a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java @@ -96,7 +96,8 @@ public static Iterable constructorFeeder() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dim", "keys", "values")), null, null), "\t", ",", - Arrays.asList("ts", "dim", "keys", "values") + Arrays.asList("ts", "dim", "keys", "values"), + false ) , "utf8" ); diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java index a7c27c66638e..9df181bdf8d3 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java @@ -238,6 +238,12 @@ public List getFieldNames() { return delegate.getFieldNames(); } + + @Override + public void reset() + { + // do nothing + } } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format") @@ -597,6 +603,12 @@ public List getFieldNames() { throw new UOE("No field names available"); } + + @Override + public void reset() + { + // do nothing + } }; } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 7201bd119133..dd472d6f3bcf 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -346,7 +346,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map data(){ + public static Collection data() + { int[] first = new int[1]; Arrays.fill(first, 13); int[] second = new int[6]; @@ -67,7 +68,8 @@ public static Collection data(){ return Arrays.asList( new Object[][]{ { - DetermineHashedPartitionsJobTest.class.getClass().getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + DetermineHashedPartitionsJobTest.class.getClass() + .getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), 1L, "2011-04-10T00:00:00.000Z/2011-04-11T00:00:00.000Z", 0, @@ -75,7 +77,8 @@ public static Collection data(){ first }, { - DetermineHashedPartitionsJobTest.class.getClass().getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + DetermineHashedPartitionsJobTest.class.getClass() + .getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), 100L, "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, @@ -83,7 +86,8 @@ public static Collection data(){ second }, { - DetermineHashedPartitionsJobTest.class.getClass().getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + DetermineHashedPartitionsJobTest.class.getClass() + .getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), 1L, "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, @@ -116,7 +120,12 @@ public DetermineHashedPartitionsJobTest( new DelimitedParseSpec( new TimestampSpec("ts", null, null), new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("market", "quality", "placement", "placementish")), + DimensionsSpec.getDefaultSchemas(ImmutableList.of( + "market", + "quality", + "placement", + "placementish" + )), null, null ), @@ -129,7 +138,8 @@ public DetermineHashedPartitionsJobTest( "placement", "placementish", "index" - ) + ), + false ), null ), @@ -176,7 +186,8 @@ public DetermineHashedPartitionsJobTest( } @Test - public void testDetermineHashedPartitions(){ + public void testDetermineHashedPartitions() + { DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig); determineHashedPartitionsJob.run(); Map> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs(); @@ -184,8 +195,8 @@ public void testDetermineHashedPartitions(){ expectedNumTimeBuckets, shardSpecs.entrySet().size() ); - int i=0; - for(Map.Entry> entry : shardSpecs.entrySet()) { + int i = 0; + for (Map.Entry> entry : shardSpecs.entrySet()) { Assert.assertEquals( expectedNumOfShards[i++], entry.getValue().size(), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 9fc2e8eb0400..81766d3fd56a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -227,7 +227,8 @@ public DeterminePartitionsJobTest( new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null), null, - ImmutableList.of("timestamp", "host", "country", "visited_num") + ImmutableList.of("timestamp", "host", "country", "visited_num"), + false ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 250b9dd8b045..8ba7bfb9487f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -142,7 +142,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ), null ), @@ -188,7 +189,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ) ), null, @@ -233,7 +235,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ), null ), @@ -289,7 +292,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ) ), null, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 8af6c470ff69..74d6ffeb23eb 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -77,7 +77,8 @@ public void setup() throws Exception new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index f1c7d558c909..b29d3bb9c4a3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -265,7 +265,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(null, null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 4b9adba54c4f..ffa6d4780173 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -164,7 +164,8 @@ public InputStream openStream() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null), "\t", "\u0001", - Arrays.asList(TestIndex.COLUMNS) + Arrays.asList(TestIndex.COLUMNS), + false ), null ), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 75a86d8b84bb..c13671d23204 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -272,6 +272,10 @@ private Map> determineShardSpecs( while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); + if (inputRow == null) { + continue; + } + final Interval interval; if (determineIntervals) { interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 81e85c70dba1..864ee8ab777e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; @@ -74,6 +75,23 @@ public class IndexTaskTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val"), + false + ); + private final IndexSpec indexSpec; private final ObjectMapper jsonMapper; private IndexMerger indexMerger; @@ -107,7 +125,7 @@ public void testDeterminePartitions() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, false), + createIngestionSpec(tmpDir, null, null, 2, null, false, false), null, jsonMapper ); @@ -145,7 +163,7 @@ public void testForceExtendableShardSpecs() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, true, false), + createIngestionSpec(tmpDir, null, null, 2, null, true, false), null, jsonMapper ); @@ -185,6 +203,7 @@ public void testWithArbitraryGranularity() throws Exception null, createIngestionSpec( tmpDir, + null, new ArbitraryGranularitySpec( Granularities.MINUTE, Arrays.asList(new Interval("2014/2015")) @@ -220,6 +239,7 @@ public void testIntervalBucketing() throws Exception null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.HOUR, @@ -254,7 +274,7 @@ public void testNumShardsProvided() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 1, false, false), + createIngestionSpec(tmpDir, null, null, null, 1, false, false), null, jsonMapper ); @@ -285,7 +305,7 @@ public void testAppendToExisting() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, true), + createIngestionSpec(tmpDir, null, null, 2, null, false, true), null, jsonMapper ); @@ -323,6 +343,7 @@ public void testIntervalNotSpecified() throws Exception null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, @@ -357,6 +378,110 @@ public void testIntervalNotSpecified() throws Exception Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); } + @Test + public void testCSVFileWithHeader() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + null, + true + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + + @Test + public void testCSVFileWithHeaderColumnOverride() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("dim"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -434,6 +559,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException private IndexTask.IndexIngestionSpec createIngestionSpec( File baseDir, + ParseSpec parseSpec, GranularitySpec granularitySpec, Integer targetPartitionSize, Integer numShards, @@ -446,20 +572,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( "test", jsonMapper.convertValue( new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), + parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, null ), Map.class diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index e45ed7e6acc7..b9c8a06f383c 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -40,6 +40,8 @@ public class CSVParser implements Parser private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); private ArrayList fieldNames = null; + private boolean firstRowIsHeader = false; + private boolean hasParsedHeader = true; public CSVParser(final Optional listDelimiter) { @@ -78,6 +80,18 @@ public CSVParser(final Optional listDelimiter, final String header) setFieldNames(header); } + public CSVParser( + final Optional listDelimiter, + final Iterable fieldNames, + final boolean firstRowIsHeader + ) + { + this(listDelimiter, fieldNames); + + this.firstRowIsHeader = firstRowIsHeader; + this.hasParsedHeader = !firstRowIsHeader; + } + public String getListDelimiter() { return listDelimiter; @@ -92,8 +106,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(final String header) @@ -112,6 +128,14 @@ public Map parse(final String input) try { String[] values = parser.parseLine(input); + if (firstRowIsHeader && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(Arrays.asList(values)); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(values.length)); } @@ -122,4 +146,10 @@ public Map parse(final String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } + + @Override + public void reset() + { + hasParsedHeader = !firstRowIsHeader; + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index c37c4bda989c..948500bb5d3f 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -44,6 +44,8 @@ public class DelimitedParser implements Parser private final Function valueFunction; private ArrayList fieldNames = null; + private boolean firstRowIsHeader = false; + private boolean hasParsedHeader = true; public DelimitedParser(final Optional delimiter, Optional listDelimiter) { @@ -89,13 +91,24 @@ public DelimitedParser( } public DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) - { this(delimiter, listDelimiter); setFieldNames(header); } + public DelimitedParser( + final Optional delimiter, + final Optional listDelimiter, + final boolean firstRowIsHeader + ) + { + this(delimiter, listDelimiter); + + this.firstRowIsHeader = firstRowIsHeader; + this.hasParsedHeader = !firstRowIsHeader; + } + public String getDelimiter() { return delimiter; @@ -135,6 +148,14 @@ public Map parse(final String input) try { Iterable values = splitter.split(input); + if (firstRowIsHeader && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(values); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(Iterators.size(values.iterator()))); } @@ -145,4 +166,10 @@ public Map parse(final String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } + + @Override + public void reset() + { + hasParsedHeader = !firstRowIsHeader; + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java index 15d2dc8b3b8c..2a6885c35cbd 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java @@ -150,4 +150,10 @@ public Map parse(String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } + + @Override + public void reset() + { + // do nothing + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java index 2717d2db915a..404f5cc75917 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java @@ -282,4 +282,9 @@ public String getExpr() } } + @Override + public void reset() + { + // do nothing + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java index 2357be2245c4..36dbbaf467b0 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java @@ -99,4 +99,10 @@ public List getFieldNames() { throw new UnsupportedOperationException(); } + + @Override + public void reset() + { + // do nothing + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java index 78b29de72244..2c21779c678a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java @@ -34,6 +34,11 @@ public interface Parser */ public Map parse(String input); + /** + * Resets state within a parser. + */ + public void reset(); + /** * Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be * expected to know what fields they will return. Some individual types of parsers do need to know (like a TSV diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java index 87b6321aa20f..7cd0061b12d0 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java @@ -117,9 +117,14 @@ public void setFieldNames(Iterable fieldNames) } @Override - public List getFieldNames() { return fieldNames; } + + @Override + public void reset() + { + // do nothing + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java index 7e45bf23d941..7eef985e53eb 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java @@ -64,4 +64,10 @@ public List getFieldNames() { return baseParser.getFieldNames(); } + + @Override + public void reset() + { + baseParser.reset(); + } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index d4fb1223ec82..a20afb72f5c5 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -130,7 +130,8 @@ public static void setupClass() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index dc7648154da2..c46c8eb8d22d 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -144,7 +144,8 @@ private Segment createSegment() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 6f5297684919..3289836d9553 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -291,7 +291,8 @@ public static IncrementalIndex loadIncrementalIndex( new DimensionsSpec(DIMENSION_SCHEMAS, null, null), "\t", "\u0001", - Arrays.asList(COLUMNS) + Arrays.asList(COLUMNS), + false ) , "utf8" ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java index 53a586ce4f28..2bca6eae939a 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java @@ -162,6 +162,11 @@ public ReplayableFirehose(InputRowParser parser) throws IOException while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) { try { InputRow row = delegateFirehose.nextRow(); + + if (row == null) { + continue; + } + generator.writeObject(row); dimensionScratch.addAll(row.getDimensions()); counter++; diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index ea8b5590701a..1e295721a820 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -108,7 +108,8 @@ private void createTestIndex(File segmentDir) throws Exception new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false ), Charsets.UTF_8.toString() ); From c3965ed0e04f6ab12773e20accd3e79dafd63df6 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 20 Apr 2017 16:32:33 -0700 Subject: [PATCH 2/9] small fixes --- .../java/io/druid/data/input/impl/FileIteratingFirehose.java | 2 +- docs/content/ingestion/data-formats.md | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index cebf0e018e99..185f40b1309a 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -51,7 +51,6 @@ public boolean hasMore() { while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { lineIterator = lineIterators.next(); - parser.reset(); } return lineIterator != null && lineIterator.hasNext(); @@ -68,6 +67,7 @@ public InputRow nextRow() } lineIterator = lineIterators.next(); + parser.reset(); } return parser.parse(lineIterator.next()); diff --git a/docs/content/ingestion/data-formats.md b/docs/content/ingestion/data-formats.md index eef8d32c2d42..96e95b2a5983 100644 --- a/docs/content/ingestion/data-formats.md +++ b/docs/content/ingestion/data-formats.md @@ -72,8 +72,6 @@ If you have nested JSON, [Druid can automatically flatten it for you](flatten-js ### CSV -Since the CSV data cannot contain the column names (no header is allowed), these must be added before that data can be processed: - ```json "parseSpec":{ "format" : "csv", From f68be6b68494cafbd19ca561523bca0c008ceebb Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 20 Apr 2017 16:40:06 -0700 Subject: [PATCH 3/9] fix bug --- .../main/java/io/druid/data/input/impl/StringInputRowParser.java | 1 - 1 file changed, 1 deletion(-) diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 58baeed5b7ca..0b14a5a4d827 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -32,7 +32,6 @@ import java.nio.charset.Charset; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; -import java.util.LinkedHashMap; import java.util.Map; /** From e188eea46cf7ea7e7964a9a76fa63a7a09189f4c Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 25 Apr 2017 15:23:15 -0700 Subject: [PATCH 4/9] fix bug --- .../java/io/druid/data/input/impl/DelimitedParseSpec.java | 4 +++- .../java/io/druid/data/input/impl/DelimitedParseSpecTest.java | 3 +-- .../io/druid/java/util/common/parsers/DelimitedParser.java | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index c0c01520634d..026ad6d2ef73 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -106,12 +106,14 @@ public Parser makeParser() return new DelimitedParser( Optional.fromNullable(delimiter), Optional.fromNullable(listDelimiter), + columns, firstRowIsHeader ); } else { return new DelimitedParser( Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter) + Optional.fromNullable(listDelimiter), + columns ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java index 3cadfee4ff25..af80391bb7b3 100644 --- a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java @@ -98,7 +98,7 @@ public void testComma() throws Exception ); } - @Test(expected = NullPointerException.class) + @Test(expected = IllegalArgumentException.class) public void testDefaultColumnList() { final DelimitedParseSpec spec = new DelimitedParseSpec( @@ -114,7 +114,6 @@ public void testDefaultColumnList() ), ",", null, - // pass null columns not allowed null, false ); diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index 948500bb5d3f..a3abfdbc4a5c 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -100,6 +100,7 @@ public DelimitedParser(final Optional delimiter, final Optional public DelimitedParser( final Optional delimiter, final Optional listDelimiter, + final Iterable fieldNames, final boolean firstRowIsHeader ) { From 96374cb81ec0316395b5eb835239e3da4789af8f Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 25 Apr 2017 17:43:20 -0700 Subject: [PATCH 5/9] address code review --- .../druid/data/input/impl/CSVParseSpec.java | 26 +++++------ .../data/input/impl/DelimitedParseSpec.java | 44 ++++++++----------- .../input/impl/FileIteratingFirehose.java | 6 +++ .../data/input/impl/StringInputRowParser.java | 6 +-- .../data/input/impl/TimeAndDimsParseSpec.java | 7 --- docs/content/ingestion/data-formats.md | 21 ++++++--- .../namespace/URIExtractionNamespace.java | 8 ---- .../java/util/common/parsers/CSVParser.java | 12 ++--- .../java/util/common/parsers/JSONParser.java | 6 --- .../util/common/parsers/JSONPathParser.java | 8 +--- .../util/common/parsers/JavaScriptParser.java | 6 --- .../java/util/common/parsers/Parser.java | 5 ++- .../java/util/common/parsers/RegexParser.java | 6 --- 13 files changed, 65 insertions(+), 96 deletions(-) diff --git a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java index e8b2788db53a..601664657884 100644 --- a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java @@ -34,7 +34,7 @@ public class CSVParseSpec extends ParseSpec { private final String listDelimiter; private final List columns; - private final boolean firstRowIsHeader; + private final boolean hasHeaderRow; @JsonCreator public CSVParseSpec( @@ -42,7 +42,7 @@ public CSVParseSpec( @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("listDelimiter") String listDelimiter, @JsonProperty("columns") List columns, - @JsonProperty("firstRowIsHeader") boolean firstRowIsHeader + @JsonProperty("hasHeaderRow") boolean hasHeaderRow ) { super(timestampSpec, dimensionsSpec); @@ -57,12 +57,12 @@ public CSVParseSpec( verify(dimensionsSpec.getDimensionNames()); } else { Preconditions.checkArgument( - firstRowIsHeader, - "If columns field is not set, the first row of your data must have your header and firstRowIsHeader must be set to true." + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header and hasHeaderRow must be set to true." ); } - this.firstRowIsHeader = firstRowIsHeader; + this.hasHeaderRow = hasHeaderRow; } @JsonProperty @@ -78,9 +78,9 @@ public List getColumns() } @JsonProperty - public boolean isFirstRowIsHeader() + public boolean isHasHeaderRow() { - return firstRowIsHeader; + return hasHeaderRow; } @Override @@ -94,27 +94,23 @@ public void verify(List usedCols) @Override public Parser makeParser() { - if (firstRowIsHeader) { - return new CSVParser(Optional.fromNullable(listDelimiter), columns, firstRowIsHeader); - } else { - return new CSVParser(Optional.fromNullable(listDelimiter), columns); - } + return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow); } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, firstRowIsHeader); + return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, firstRowIsHeader); + return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow); } public ParseSpec withColumns(List cols) { - return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, firstRowIsHeader); + return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow); } } diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index 026ad6d2ef73..4b0b218b20f1 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -35,7 +35,7 @@ public class DelimitedParseSpec extends ParseSpec private final String delimiter; private final String listDelimiter; private final List columns; - private final boolean firstRowIsHeader; + private final boolean hasHeaderRow; @JsonCreator public DelimitedParseSpec( @@ -44,7 +44,7 @@ public DelimitedParseSpec( @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, @JsonProperty("columns") List columns, - @JsonProperty("firstRowIsHeader") boolean firstRowIsHeader + @JsonProperty("hasHeaderRow") boolean hasHeaderRow ) { super(timestampSpec, dimensionsSpec); @@ -59,12 +59,12 @@ public DelimitedParseSpec( verify(dimensionsSpec.getDimensionNames()); } else { Preconditions.checkArgument( - firstRowIsHeader, - "If columns field is not set, the first row of your data must have your header and firstRowIsHeader must be set to true." + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header and hasHeaderRow must be set to true." ); } - this.firstRowIsHeader = firstRowIsHeader; + this.hasHeaderRow = hasHeaderRow; } @JsonProperty("delimiter") @@ -86,9 +86,9 @@ public List getColumns() } @JsonProperty - public boolean isFirstRowIsHeader() + public boolean isHasHeaderRow() { - return firstRowIsHeader; + return hasHeaderRow; } @Override @@ -102,50 +102,42 @@ public void verify(List usedCols) @Override public Parser makeParser() { - if (firstRowIsHeader) { - return new DelimitedParser( - Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter), - columns, - firstRowIsHeader - ); - } else { - return new DelimitedParser( - Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter), - columns - ); - } + return new DelimitedParser( + Optional.fromNullable(delimiter), + Optional.fromNullable(listDelimiter), + columns, + hasHeaderRow + ); } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns, firstRowIsHeader); + return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns, hasHeaderRow); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns, firstRowIsHeader); + return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns, hasHeaderRow); } public ParseSpec withDelimiter(String delim) { return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns, - firstRowIsHeader + hasHeaderRow ); } public ParseSpec withListDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns, firstRowIsHeader); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns, hasHeaderRow); } public ParseSpec withColumns(List cols) { return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols, - firstRowIsHeader + hasHeaderRow ); } } diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index 185f40b1309a..03edd8eb462e 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -50,7 +50,13 @@ public FileIteratingFirehose( public boolean hasMore() { while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { + // Close old streams, maybe. + if (lineIterator != null) { + lineIterator.close(); + } + lineIterator = lineIterators.next(); + parser.reset(); } return lineIterator != null && lineIterator.hasNext(); diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 0b14a5a4d827..15e9e1810127 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -129,9 +129,9 @@ public void reset() } public InputRow parse(String input) - { - return parseMap(parseString(input)); - } + { + return parseMap(parseString(input)); + } private Map parseString(String inputString) { diff --git a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java index 2789537d1574..ec1fdb2a0402 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import io.druid.java.util.common.parsers.Parser; import java.util.List; @@ -64,12 +63,6 @@ public List getFieldNames() { throw new UnsupportedOperationException("not supported"); } - - @Override - public void reset() - { - // do nothing - } }; } diff --git a/docs/content/ingestion/data-formats.md b/docs/content/ingestion/data-formats.md index 96e95b2a5983..f539fa0b0a08 100644 --- a/docs/content/ingestion/data-formats.md +++ b/docs/content/ingestion/data-formats.md @@ -79,20 +79,25 @@ If you have nested JSON, [Druid can automatically flatten it for you](flatten-js "column" : "timestamp" }, "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"], - "firstRowIsHeader" : "false", "dimensionsSpec" : { "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"] } } ``` +#### CSV Index Tasks + If your file does not have a header as the first line of the file, you must set the `columns` field and ensure that the order of the fields matches the columns of your input data in the same order. -If your file does have a header, set `firstRowIsHeader` to true, and do not include the `columns` key. +If your file does have a header, you can set a field called `hasHeaderRow` to true, and do not include the `columns` key. + +#### Other CSV Ingestion Tasks -### TSV +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + +### TSV (Delimited) ```json - "parseSpec":{ + "parseSpec": { "format" : "tsv", "timestampSpec" : { "column" : "timestamp" @@ -105,11 +110,17 @@ If your file does have a header, set `firstRowIsHeader` to true, and do not incl } ``` +#### TSV (Delimited) Index Tasks + If your file does not have a header as the first line of the file, you must set the `columns` field and ensure that the order of the fields matches the columns of your input data in the same order. -If your file does have a header, set `firstRowIsHeader` to true, and do not include the `columns` key. +If your file does have a header, you can set a field called `hasHeaderRow` to true, and do not include the `columns` key. Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed. +#### Other TSV (Delimited) Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + ### Regex ```json diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java index 9df181bdf8d3..971dfe5aa387 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java @@ -33,7 +33,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import io.druid.guice.annotations.Json; import io.druid.java.util.common.IAE; import io.druid.java.util.common.UOE; @@ -41,7 +40,6 @@ import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.JSONParser; import io.druid.java.util.common.parsers.Parser; - import org.joda.time.Period; import javax.annotation.Nullable; @@ -603,12 +601,6 @@ public List getFieldNames() { throw new UOE("No field names available"); } - - @Override - public void reset() - { - // do nothing - } }; } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index b9c8a06f383c..4306ae4a65e6 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -40,7 +40,7 @@ public class CSVParser implements Parser private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); private ArrayList fieldNames = null; - private boolean firstRowIsHeader = false; + private boolean hasHeaderRow = false; private boolean hasParsedHeader = true; public CSVParser(final Optional listDelimiter) @@ -83,13 +83,13 @@ public CSVParser(final Optional listDelimiter, final String header) public CSVParser( final Optional listDelimiter, final Iterable fieldNames, - final boolean firstRowIsHeader + final boolean hasHeaderRow ) { this(listDelimiter, fieldNames); - this.firstRowIsHeader = firstRowIsHeader; - this.hasParsedHeader = !firstRowIsHeader; + this.hasHeaderRow = hasHeaderRow; + this.hasParsedHeader = !hasHeaderRow; } public String getListDelimiter() @@ -128,7 +128,7 @@ public Map parse(final String input) try { String[] values = parser.parseLine(input); - if (firstRowIsHeader && !hasParsedHeader) { + if (hasHeaderRow && !hasParsedHeader) { if (fieldNames == null) { setFieldNames(Arrays.asList(values)); } @@ -150,6 +150,6 @@ public Map parse(final String input) @Override public void reset() { - hasParsedHeader = !firstRowIsHeader; + hasParsedHeader = !hasHeaderRow; } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java index 2a6885c35cbd..15d2dc8b3b8c 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONParser.java @@ -150,10 +150,4 @@ public Map parse(String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } - - @Override - public void reset() - { - // do nothing - } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java index 404f5cc75917..d23d599d3bca 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java @@ -238,7 +238,7 @@ public enum FieldType /** * Specifies a field to be added to the parsed object Map, using JsonPath notation. - * + *

* See https://github.com/jayway/JsonPath for more information. */ public static class FieldSpec @@ -281,10 +281,4 @@ public String getExpr() return expr; } } - - @Override - public void reset() - { - // do nothing - } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java index 36dbbaf467b0..2357be2245c4 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JavaScriptParser.java @@ -99,10 +99,4 @@ public List getFieldNames() { throw new UnsupportedOperationException(); } - - @Override - public void reset() - { - // do nothing - } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java index 2c21779c678a..d5c0ca395de8 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java @@ -37,7 +37,10 @@ public interface Parser /** * Resets state within a parser. */ - public void reset(); + public default void reset() + { + // do nothing + } /** * Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java index 7cd0061b12d0..329b02aa944a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java @@ -121,10 +121,4 @@ public List getFieldNames() { return fieldNames; } - - @Override - public void reset() - { - // do nothing - } } From 8bbb5d300029080ef1f59202817ed4c38b50ddd0 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 25 Apr 2017 18:01:47 -0700 Subject: [PATCH 6/9] more cr --- .../data/input/impl/StringInputRowParser.java | 2 +- .../java/util/common/parsers/CSVParser.java | 57 +++++++++------ .../util/common/parsers/DelimitedParser.java | 70 ++++++++++++------- .../java/util/common/parsers/Parser.java | 2 +- .../firehose/ReplayableFirehoseFactory.java | 1 + 5 files changed, 82 insertions(+), 50 deletions(-) diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 15e9e1810127..3041c8ec3191 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -140,7 +140,7 @@ private Map parseString(String inputString) private InputRow parseMap(Map theMap) { - // if the file has a header, null is returned + // If a header is present in the data (and with proper configurations), a null is returned if (theMap == null) { return null; } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index 4306ae4a65e6..1ea546811865 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -33,26 +33,17 @@ public class CSVParser implements Parser { - private final String listDelimiter; - private final Splitter listSplitter; - private final Function valueFunction; - - private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); - - private ArrayList fieldNames = null; - private boolean hasHeaderRow = false; - private boolean hasParsedHeader = true; - - public CSVParser(final Optional listDelimiter) + private static final Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) { - this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; - this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() + return new Function() { @Override public Object apply(String input) { - if (input.contains(CSVParser.this.listDelimiter)) { + if (input.contains(listDelimiter)) { return Lists.newArrayList( Iterables.transform( listSplitter.split(input), @@ -66,6 +57,26 @@ public Object apply(String input) }; } + private final String listDelimiter; + private final Splitter listSplitter; + private final Function valueFunction; + + private final boolean hasHeaderRow; + + private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); + + private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; + + public CSVParser(final Optional listDelimiter) + { + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + + this.hasHeaderRow = false; + } + public CSVParser(final Optional listDelimiter, final Iterable fieldNames) { this(listDelimiter); @@ -86,10 +97,12 @@ public CSVParser( final boolean hasHeaderRow ) { - this(listDelimiter, fieldNames); - + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); this.hasHeaderRow = hasHeaderRow; - this.hasParsedHeader = !hasHeaderRow; + + setFieldNames(fieldNames); } public String getListDelimiter() @@ -106,10 +119,8 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - if (fieldNames != null) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); - } + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); } public void setFieldNames(final String header) @@ -150,6 +161,6 @@ public Map parse(final String input) @Override public void reset() { - hasParsedHeader = !hasHeaderRow; + hasParsedHeader = false; } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index a3abfdbc4a5c..b6325ab19d6e 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -36,16 +36,40 @@ public class DelimitedParser implements Parser { private static final String DEFAULT_DELIMITER = "\t"; + private static final Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) + { + return new Function() + { + @Override + public Object apply(String input) + { + if (input.contains(listDelimiter)) { + return Lists.newArrayList( + Iterables.transform( + listSplitter.split(input), + ParserUtils.nullEmptyStringFunction + ) + ); + } else { + return ParserUtils.nullEmptyStringFunction.apply(input); + } + } + }; + } + private final String delimiter; private final String listDelimiter; private final Splitter splitter; private final Splitter listSplitter; private final Function valueFunction; + private final boolean hasHeaderRow; private ArrayList fieldNames = null; - private boolean firstRowIsHeader = false; - private boolean hasParsedHeader = true; + private boolean hasParsedHeader = false; public DelimitedParser(final Optional delimiter, Optional listDelimiter) { @@ -60,23 +84,8 @@ public DelimitedParser(final Optional delimiter, Optional listDe this.splitter = Splitter.on(this.delimiter); this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() - { - @Override - public Object apply(String input) - { - if (input.contains(DelimitedParser.this.listDelimiter)) { - return Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); - } else { - return ParserUtils.nullEmptyStringFunction.apply(input); - } - } - }; + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = false; } public DelimitedParser( @@ -101,13 +110,24 @@ public DelimitedParser( final Optional delimiter, final Optional listDelimiter, final Iterable fieldNames, - final boolean firstRowIsHeader + final boolean hasHeaderRow ) { - this(delimiter, listDelimiter); + this.delimiter = delimiter.isPresent() ? delimiter.get() : DEFAULT_DELIMITER; + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; - this.firstRowIsHeader = firstRowIsHeader; - this.hasParsedHeader = !firstRowIsHeader; + Preconditions.checkState( + !this.delimiter.equals(this.listDelimiter), + "Cannot have same delimiter and list delimiter of [%s]", + this.delimiter + ); + + this.splitter = Splitter.on(this.delimiter); + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = hasHeaderRow; + + setFieldNames(fieldNames); } public String getDelimiter() @@ -149,7 +169,7 @@ public Map parse(final String input) try { Iterable values = splitter.split(input); - if (firstRowIsHeader && !hasParsedHeader) { + if (hasHeaderRow && !hasParsedHeader) { if (fieldNames == null) { setFieldNames(values); } @@ -171,6 +191,6 @@ public Map parse(final String input) @Override public void reset() { - hasParsedHeader = !firstRowIsHeader; + hasParsedHeader = false; } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java index d5c0ca395de8..d284317a620f 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java @@ -35,7 +35,7 @@ public interface Parser public Map parse(String input); /** - * Resets state within a parser. + * Resets state within a parser. This may or may not get called at the start of reading of every file. */ public default void reset() { diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java index 2bca6eae939a..c50367a796ba 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java @@ -163,6 +163,7 @@ public ReplayableFirehose(InputRowParser parser) throws IOException try { InputRow row = delegateFirehose.nextRow(); + // If a header is present in the data (and with proper configurations), a null InputRow will get created if (row == null) { continue; } From aeb0fc68e927e6a774d91c26ea58f696ab70fb31 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 25 Apr 2017 18:03:57 -0700 Subject: [PATCH 7/9] more cr --- .../query/lookup/namespace/URIExtractionNamespace.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java index 971dfe5aa387..a5103a4e2485 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java @@ -236,12 +236,6 @@ public List getFieldNames() { return delegate.getFieldNames(); } - - @Override - public void reset() - { - // do nothing - } } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format") From 7e7d6f04740e040ea5616d6fa22b0be93f536df1 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 25 Apr 2017 18:12:44 -0700 Subject: [PATCH 8/9] more cr --- .../src/main/java/io/druid/indexing/common/task/IndexTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index c13671d23204..70f6140da070 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -272,6 +272,7 @@ private Map> determineShardSpecs( while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); + // If a header is present in the data (and with proper configurations), a null InputRow will get created if (inputRow == null) { continue; } From 6ad82f59aa31b74cf29350db2dbc07ca7eade84e Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 26 Apr 2017 14:45:43 -0700 Subject: [PATCH 9/9] fix --- .../java/io/druid/java/util/common/parsers/CSVParser.java | 6 ++++-- .../io/druid/java/util/common/parsers/DelimitedParser.java | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index 1ea546811865..377e8138ada4 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -119,8 +119,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(final String header) diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index b6325ab19d6e..ddf43300fdc9 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -149,8 +149,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(String header)