diff --git a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java index bbe1fc4d2288..601664657884 100644 --- a/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.CSVParser; import io.druid.java.util.common.parsers.Parser; @@ -35,26 +34,35 @@ public class CSVParseSpec extends ParseSpec { private final String listDelimiter; private final List columns; + private final boolean hasHeaderRow; @JsonCreator public CSVParseSpec( @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow ) { super(timestampSpec, dimensionsSpec); this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); - for (String column : columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); - } - this.columns = columns; - verify(dimensionsSpec.getDimensionNames()); + if (columns != null) { + for (String column : columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header and hasHeaderRow must be set to true." + ); + } + + this.hasHeaderRow = hasHeaderRow; } @JsonProperty @@ -69,6 +77,12 @@ public List getColumns() return columns; } + @JsonProperty + public boolean isHasHeaderRow() + { + return hasHeaderRow; + } + @Override public void verify(List usedCols) { @@ -80,23 +94,23 @@ public void verify(List usedCols) @Override public Parser makeParser() { - return new CSVParser(Optional.fromNullable(listDelimiter), columns); + return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow); } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns); + return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns); + return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow); } public ParseSpec withColumns(List cols) { - return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols); + return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow); } } diff --git a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java index 6d7096d7921f..4b0b218b20f1 100644 --- a/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.Parser; @@ -36,6 +35,7 @@ public class DelimitedParseSpec extends ParseSpec private final String delimiter; private final String listDelimiter; private final List columns; + private final boolean hasHeaderRow; @JsonCreator public DelimitedParseSpec( @@ -43,20 +43,28 @@ public DelimitedParseSpec( @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, @JsonProperty("delimiter") String delimiter, @JsonProperty("listDelimiter") String listDelimiter, - @JsonProperty("columns") List columns + @JsonProperty("columns") List columns, + @JsonProperty("hasHeaderRow") boolean hasHeaderRow ) { super(timestampSpec, dimensionsSpec); this.delimiter = delimiter; this.listDelimiter = listDelimiter; - Preconditions.checkNotNull(columns, "columns"); this.columns = columns; - for (String column : this.columns) { - Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + if (columns != null) { + for (String column : this.columns) { + Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column); + } + verify(dimensionsSpec.getDimensionNames()); + } else { + Preconditions.checkArgument( + hasHeaderRow, + "If columns field is not set, the first row of your data must have your header and hasHeaderRow must be set to true." + ); } - verify(dimensionsSpec.getDimensionNames()); + this.hasHeaderRow = hasHeaderRow; } @JsonProperty("delimiter") @@ -77,6 +85,12 @@ public List getColumns() return columns; } + @JsonProperty + public boolean isHasHeaderRow() + { + return hasHeaderRow; + } + @Override public void verify(List usedCols) { @@ -88,38 +102,42 @@ public void verify(List usedCols) @Override public Parser makeParser() { - Parser retVal = new DelimitedParser( + return new DelimitedParser( Optional.fromNullable(delimiter), - Optional.fromNullable(listDelimiter) + Optional.fromNullable(listDelimiter), + columns, + hasHeaderRow ); - retVal.setFieldNames(columns); - return retVal; } @Override public ParseSpec withTimestampSpec(TimestampSpec spec) { - return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns); + return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns, hasHeaderRow); } @Override public ParseSpec withDimensionsSpec(DimensionsSpec spec) { - return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns); + return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns, hasHeaderRow); } public ParseSpec withDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns, + hasHeaderRow + ); } public ParseSpec withListDelimiter(String delim) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns, hasHeaderRow); } public ParseSpec withColumns(List cols) { - return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols); + return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols, + hasHeaderRow + ); } } diff --git a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java index 97e33f04a894..03edd8eb462e 100644 --- a/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java +++ b/api/src/main/java/io/druid/data/input/impl/FileIteratingFirehose.java @@ -50,7 +50,13 @@ public FileIteratingFirehose( public boolean hasMore() { while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { + // Close old streams, maybe. + if (lineIterator != null) { + lineIterator.close(); + } + lineIterator = lineIterators.next(); + parser.reset(); } return lineIterator != null && lineIterator.hasNext(); @@ -67,6 +73,7 @@ public InputRow nextRow() } lineIterator = lineIterators.next(); + parser.reset(); } return parser.parse(lineIterator.next()); diff --git a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java index 6a13fcb7bd19..3041c8ec3191 100644 --- a/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/StringInputRowParser.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; - import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.InputRow; import io.druid.java.util.common.parsers.ParseException; @@ -124,9 +123,9 @@ private Map buildStringKeyMap(ByteBuffer input) return theMap; } - private Map parseString(String inputString) + public void reset() { - return parser.parse(inputString); + parser.reset(); } public InputRow parse(String input) @@ -134,8 +133,17 @@ public InputRow parse(String input) return parseMap(parseString(input)); } + private Map parseString(String inputString) + { + return parser.parse(inputString); + } + private InputRow parseMap(Map theMap) { + // If a header is present in the data (and with proper configurations), a null is returned + if (theMap == null) { + return null; + } return mapParser.parse(theMap); } } diff --git a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java index e6740cb63f4b..ec1fdb2a0402 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimeAndDimsParseSpec.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import io.druid.java.util.common.parsers.Parser; import java.util.List; diff --git a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java index 7d99a2b804de..520f98bc03fb 100644 --- a/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/CSVParseSpecTest.java @@ -41,7 +41,8 @@ public void testColumnMissing() throws Exception Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false ); } @@ -60,7 +61,8 @@ public void testComma() throws Exception Lists.newArrayList() ), ",", - Arrays.asList("a") + Arrays.asList("a"), + false ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java index 1ebad8c64141..af80391bb7b3 100644 --- a/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/DelimitedParseSpecTest.java @@ -40,7 +40,8 @@ public void testSerde() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("abc")), null, null), "\u0001", "\u0002", - Arrays.asList("abc") + Arrays.asList("abc"), + false ); final DelimitedParseSpec serde = jsonMapper.readValue( jsonMapper.writeValueAsString(spec), @@ -71,7 +72,8 @@ public void testColumnMissing() throws Exception ), ",", " ", - Arrays.asList("a") + Arrays.asList("a"), + false ); } @@ -91,12 +93,14 @@ public void testComma() throws Exception ), ",", null, - Arrays.asList("a") + Arrays.asList("a"), + false ); } - @Test(expected = NullPointerException.class) - public void testDefaultColumnList(){ + @Test(expected = IllegalArgumentException.class) + public void testDefaultColumnList() + { final DelimitedParseSpec spec = new DelimitedParseSpec( new TimestampSpec( "timestamp", @@ -110,8 +114,8 @@ public void testDefaultColumnList(){ ), ",", null, - // pass null columns not allowed - null + null, + false ); } } diff --git a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java index 87f5d20fcd5c..dc5df609c28a 100644 --- a/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java +++ b/api/src/test/java/io/druid/data/input/impl/FileIteratingFirehoseTest.java @@ -67,7 +67,8 @@ public LineIterator apply(String s) new TimestampSpec("ts", "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null), ",", - ImmutableList.of("ts", "x") + ImmutableList.of("ts", "x"), + false ), null ); diff --git a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java index 4b38453fe35f..593934080cca 100644 --- a/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java @@ -45,7 +45,8 @@ public void testDuplicateNames() throws Exception ), ",", " ", - Arrays.asList("a", "b") + Arrays.asList("a", "b"), + false ); } @@ -65,7 +66,8 @@ public void testDimAndDimExcluOverlap() throws Exception ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false ); } @@ -85,7 +87,8 @@ public void testDimExclusionDuplicate() throws Exception ), ",", null, - Arrays.asList("a", "B") + Arrays.asList("a", "B"), + false ); } } diff --git a/docs/content/ingestion/data-formats.md b/docs/content/ingestion/data-formats.md index 95807cc60a6a..f539fa0b0a08 100644 --- a/docs/content/ingestion/data-formats.md +++ b/docs/content/ingestion/data-formats.md @@ -72,8 +72,6 @@ If you have nested JSON, [Druid can automatically flatten it for you](flatten-js ### CSV -Since the CSV data cannot contain the column names (no header is allowed), these must be added before that data can be processed: - ```json "parseSpec":{ "format" : "csv", @@ -87,12 +85,19 @@ Since the CSV data cannot contain the column names (no header is allowed), these } ``` -The `columns` field must match the columns of your input data in the same order. +#### CSV Index Tasks + +If your file does not have a header as the first line of the file, you must set the `columns` field and ensure that the order of the fields matches the columns of your input data in the same order. +If your file does have a header, you can set a field called `hasHeaderRow` to true, and do not include the `columns` key. + +#### Other CSV Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. -### TSV +### TSV (Delimited) ```json - "parseSpec":{ + "parseSpec": { "format" : "tsv", "timestampSpec" : { "column" : "timestamp" @@ -105,10 +110,17 @@ The `columns` field must match the columns of your input data in the same order. } ``` -The `columns` field must match the columns of your input data in the same order. +#### TSV (Delimited) Index Tasks + +If your file does not have a header as the first line of the file, you must set the `columns` field and ensure that the order of the fields matches the columns of your input data in the same order. +If your file does have a header, you can set a field called `hasHeaderRow` to true, and do not include the `columns` key. Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed. +#### Other TSV (Delimited) Ingestion Tasks + +The `columns` field must be included and and ensure that the order of the fields matches the columns of your input data in the same order. + ### Regex ```json diff --git a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java index 46d99cbbcc96..3450f11bfada 100644 --- a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java @@ -96,7 +96,8 @@ public static Iterable constructorFeeder() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dim", "keys", "values")), null, null), "\t", ",", - Arrays.asList("ts", "dim", "keys", "values") + Arrays.asList("ts", "dim", "keys", "values"), + false ) , "utf8" ); diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java index a7c27c66638e..a5103a4e2485 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java @@ -33,7 +33,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import io.druid.guice.annotations.Json; import io.druid.java.util.common.IAE; import io.druid.java.util.common.UOE; @@ -41,7 +40,6 @@ import io.druid.java.util.common.parsers.DelimitedParser; import io.druid.java.util.common.parsers.JSONParser; import io.druid.java.util.common.parsers.Parser; - import org.joda.time.Period; import javax.annotation.Nullable; diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 7201bd119133..dd472d6f3bcf 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -346,7 +346,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map data(){ + public static Collection data() + { int[] first = new int[1]; Arrays.fill(first, 13); int[] second = new int[6]; @@ -67,7 +68,8 @@ public static Collection data(){ return Arrays.asList( new Object[][]{ { - DetermineHashedPartitionsJobTest.class.getClass().getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + DetermineHashedPartitionsJobTest.class.getClass() + .getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), 1L, "2011-04-10T00:00:00.000Z/2011-04-11T00:00:00.000Z", 0, @@ -75,7 +77,8 @@ public static Collection data(){ first }, { - DetermineHashedPartitionsJobTest.class.getClass().getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + DetermineHashedPartitionsJobTest.class.getClass() + .getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), 100L, "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, @@ -83,7 +86,8 @@ public static Collection data(){ second }, { - DetermineHashedPartitionsJobTest.class.getClass().getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), + DetermineHashedPartitionsJobTest.class.getClass() + .getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(), 1L, "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z", 0, @@ -116,7 +120,12 @@ public DetermineHashedPartitionsJobTest( new DelimitedParseSpec( new TimestampSpec("ts", null, null), new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("market", "quality", "placement", "placementish")), + DimensionsSpec.getDefaultSchemas(ImmutableList.of( + "market", + "quality", + "placement", + "placementish" + )), null, null ), @@ -129,7 +138,8 @@ public DetermineHashedPartitionsJobTest( "placement", "placementish", "index" - ) + ), + false ), null ), @@ -176,7 +186,8 @@ public DetermineHashedPartitionsJobTest( } @Test - public void testDetermineHashedPartitions(){ + public void testDetermineHashedPartitions() + { DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig); determineHashedPartitionsJob.run(); Map> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs(); @@ -184,8 +195,8 @@ public void testDetermineHashedPartitions(){ expectedNumTimeBuckets, shardSpecs.entrySet().size() ); - int i=0; - for(Map.Entry> entry : shardSpecs.entrySet()) { + int i = 0; + for (Map.Entry> entry : shardSpecs.entrySet()) { Assert.assertEquals( expectedNumOfShards[i++], entry.getValue().size(), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 9fc2e8eb0400..81766d3fd56a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -227,7 +227,8 @@ public DeterminePartitionsJobTest( new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null), null, - ImmutableList.of("timestamp", "host", "country", "visited_num") + ImmutableList.of("timestamp", "host", "country", "visited_num"), + false ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 250b9dd8b045..8ba7bfb9487f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -142,7 +142,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ), null ), @@ -188,7 +189,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ) ), null, @@ -233,7 +235,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ), null ), @@ -289,7 +292,8 @@ public static Collection constructFeed() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ) ), null, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 8af6c470ff69..74d6ffeb23eb 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -77,7 +77,8 @@ public void setup() throws Exception new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited_num") + ImmutableList.of("timestamp", "host", "visited_num"), + false ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index f1c7d558c909..b29d3bb9c4a3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -265,7 +265,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig() new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(null, null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false ), null ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 4b9adba54c4f..ffa6d4780173 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -164,7 +164,8 @@ public InputStream openStream() throws IOException new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null), "\t", "\u0001", - Arrays.asList(TestIndex.COLUMNS) + Arrays.asList(TestIndex.COLUMNS), + false ), null ), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 75a86d8b84bb..70f6140da070 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -272,6 +272,11 @@ private Map> determineShardSpecs( while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); + // If a header is present in the data (and with proper configurations), a null InputRow will get created + if (inputRow == null) { + continue; + } + final Interval interval; if (determineIntervals) { interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 81e85c70dba1..864ee8ab777e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; @@ -74,6 +75,23 @@ public class IndexTaskTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val"), + false + ); + private final IndexSpec indexSpec; private final ObjectMapper jsonMapper; private IndexMerger indexMerger; @@ -107,7 +125,7 @@ public void testDeterminePartitions() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, false), + createIngestionSpec(tmpDir, null, null, 2, null, false, false), null, jsonMapper ); @@ -145,7 +163,7 @@ public void testForceExtendableShardSpecs() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, true, false), + createIngestionSpec(tmpDir, null, null, 2, null, true, false), null, jsonMapper ); @@ -185,6 +203,7 @@ public void testWithArbitraryGranularity() throws Exception null, createIngestionSpec( tmpDir, + null, new ArbitraryGranularitySpec( Granularities.MINUTE, Arrays.asList(new Interval("2014/2015")) @@ -220,6 +239,7 @@ public void testIntervalBucketing() throws Exception null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.HOUR, @@ -254,7 +274,7 @@ public void testNumShardsProvided() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, null, 1, false, false), + createIngestionSpec(tmpDir, null, null, null, 1, false, false), null, jsonMapper ); @@ -285,7 +305,7 @@ public void testAppendToExisting() throws Exception IndexTask indexTask = new IndexTask( null, null, - createIngestionSpec(tmpDir, null, 2, null, false, true), + createIngestionSpec(tmpDir, null, null, 2, null, false, true), null, jsonMapper ); @@ -323,6 +343,7 @@ public void testIntervalNotSpecified() throws Exception null, createIngestionSpec( tmpDir, + null, new UniformGranularitySpec( Granularities.HOUR, Granularities.MINUTE, @@ -357,6 +378,110 @@ public void testIntervalNotSpecified() throws Exception Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); } + @Test + public void testCSVFileWithHeader() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + null, + true + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + + @Test + public void testCSVFileWithHeaderColumnOverride() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("time,d,val"); + writer.println("2014-01-01T00:00:10Z,a,1"); + + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true + ), + null, + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals(Arrays.asList("dim"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -434,6 +559,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException private IndexTask.IndexIngestionSpec createIngestionSpec( File baseDir, + ParseSpec parseSpec, GranularitySpec granularitySpec, Integer targetPartitionSize, Integer numShards, @@ -446,20 +572,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( "test", jsonMapper.convertValue( new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), + parseSpec != null ? parseSpec : DEFAULT_PARSE_SPEC, null ), Map.class diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java index e45ed7e6acc7..377e8138ada4 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/CSVParser.java @@ -33,24 +33,17 @@ public class CSVParser implements Parser { - private final String listDelimiter; - private final Splitter listSplitter; - private final Function valueFunction; - - private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); - - private ArrayList fieldNames = null; - - public CSVParser(final Optional listDelimiter) + private static final Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) { - this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; - this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() + return new Function() { @Override public Object apply(String input) { - if (input.contains(CSVParser.this.listDelimiter)) { + if (input.contains(listDelimiter)) { return Lists.newArrayList( Iterables.transform( listSplitter.split(input), @@ -64,6 +57,26 @@ public Object apply(String input) }; } + private final String listDelimiter; + private final Splitter listSplitter; + private final Function valueFunction; + + private final boolean hasHeaderRow; + + private final au.com.bytecode.opencsv.CSVParser parser = new au.com.bytecode.opencsv.CSVParser(); + + private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; + + public CSVParser(final Optional listDelimiter) + { + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + + this.hasHeaderRow = false; + } + public CSVParser(final Optional listDelimiter, final Iterable fieldNames) { this(listDelimiter); @@ -78,6 +91,20 @@ public CSVParser(final Optional listDelimiter, final String header) setFieldNames(header); } + public CSVParser( + final Optional listDelimiter, + final Iterable fieldNames, + final boolean hasHeaderRow + ) + { + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = hasHeaderRow; + + setFieldNames(fieldNames); + } + public String getListDelimiter() { return listDelimiter; @@ -92,8 +119,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(final String header) @@ -112,6 +141,14 @@ public Map parse(final String input) try { String[] values = parser.parseLine(input); + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(Arrays.asList(values)); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(values.length)); } @@ -122,4 +159,10 @@ public Map parse(final String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } + + @Override + public void reset() + { + hasParsedHeader = false; + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java index c37c4bda989c..ddf43300fdc9 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/DelimitedParser.java @@ -36,14 +36,40 @@ public class DelimitedParser implements Parser { private static final String DEFAULT_DELIMITER = "\t"; + private static final Function getValueFunction( + final String listDelimiter, + final Splitter listSplitter + ) + { + return new Function() + { + @Override + public Object apply(String input) + { + if (input.contains(listDelimiter)) { + return Lists.newArrayList( + Iterables.transform( + listSplitter.split(input), + ParserUtils.nullEmptyStringFunction + ) + ); + } else { + return ParserUtils.nullEmptyStringFunction.apply(input); + } + } + }; + } + private final String delimiter; private final String listDelimiter; private final Splitter splitter; private final Splitter listSplitter; private final Function valueFunction; + private final boolean hasHeaderRow; private ArrayList fieldNames = null; + private boolean hasParsedHeader = false; public DelimitedParser(final Optional delimiter, Optional listDelimiter) { @@ -58,23 +84,8 @@ public DelimitedParser(final Optional delimiter, Optional listDe this.splitter = Splitter.on(this.delimiter); this.listSplitter = Splitter.on(this.listDelimiter); - this.valueFunction = new Function() - { - @Override - public Object apply(String input) - { - if (input.contains(DelimitedParser.this.listDelimiter)) { - return Lists.newArrayList( - Iterables.transform( - listSplitter.split(input), - ParserUtils.nullEmptyStringFunction - ) - ); - } else { - return ParserUtils.nullEmptyStringFunction.apply(input); - } - } - }; + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = false; } public DelimitedParser( @@ -89,13 +100,36 @@ public DelimitedParser( } public DelimitedParser(final Optional delimiter, final Optional listDelimiter, final String header) - { this(delimiter, listDelimiter); setFieldNames(header); } + public DelimitedParser( + final Optional delimiter, + final Optional listDelimiter, + final Iterable fieldNames, + final boolean hasHeaderRow + ) + { + this.delimiter = delimiter.isPresent() ? delimiter.get() : DEFAULT_DELIMITER; + this.listDelimiter = listDelimiter.isPresent() ? listDelimiter.get() : Parsers.DEFAULT_LIST_DELIMITER; + + Preconditions.checkState( + !this.delimiter.equals(this.listDelimiter), + "Cannot have same delimiter and list delimiter of [%s]", + this.delimiter + ); + + this.splitter = Splitter.on(this.delimiter); + this.listSplitter = Splitter.on(this.listDelimiter); + this.valueFunction = getValueFunction(this.listDelimiter, this.listSplitter); + this.hasHeaderRow = hasHeaderRow; + + setFieldNames(fieldNames); + } + public String getDelimiter() { return delimiter; @@ -115,8 +149,10 @@ public List getFieldNames() @Override public void setFieldNames(final Iterable fieldNames) { - ParserUtils.validateFields(fieldNames); - this.fieldNames = Lists.newArrayList(fieldNames); + if (fieldNames != null) { + ParserUtils.validateFields(fieldNames); + this.fieldNames = Lists.newArrayList(fieldNames); + } } public void setFieldNames(String header) @@ -135,6 +171,14 @@ public Map parse(final String input) try { Iterable values = splitter.split(input); + if (hasHeaderRow && !hasParsedHeader) { + if (fieldNames == null) { + setFieldNames(values); + } + hasParsedHeader = true; + return null; + } + if (fieldNames == null) { setFieldNames(ParserUtils.generateFieldNames(Iterators.size(values.iterator()))); } @@ -145,4 +189,10 @@ public Map parse(final String input) throw new ParseException(e, "Unable to parse row [%s]", input); } } + + @Override + public void reset() + { + hasParsedHeader = false; + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java index 2717d2db915a..d23d599d3bca 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/JSONPathParser.java @@ -238,7 +238,7 @@ public enum FieldType /** * Specifies a field to be added to the parsed object Map, using JsonPath notation. - * + *

* See https://github.com/jayway/JsonPath for more information. */ public static class FieldSpec @@ -281,5 +281,4 @@ public String getExpr() return expr; } } - } diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java index 78b29de72244..d284317a620f 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/Parser.java @@ -34,6 +34,14 @@ public interface Parser */ public Map parse(String input); + /** + * Resets state within a parser. This may or may not get called at the start of reading of every file. + */ + public default void reset() + { + // do nothing + } + /** * Set the fieldNames that you expect to see in parsed Maps. Deprecated; Parsers should not, in general, be * expected to know what fields they will return. Some individual types of parsers do need to know (like a TSV diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java index 87b6321aa20f..329b02aa944a 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/RegexParser.java @@ -117,7 +117,6 @@ public void setFieldNames(Iterable fieldNames) } @Override - public List getFieldNames() { return fieldNames; diff --git a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java index 7e45bf23d941..7eef985e53eb 100644 --- a/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java +++ b/java-util/src/main/java/io/druid/java/util/common/parsers/ToLowerCaseParser.java @@ -64,4 +64,10 @@ public List getFieldNames() { return baseParser.getFieldNames(); } + + @Override + public void reset() + { + baseParser.reset(); + } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index d4fb1223ec82..a20afb72f5c5 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -130,7 +130,8 @@ public static void setupClass() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index dc7648154da2..c46c8eb8d22d 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -144,7 +144,8 @@ private Segment createSegment() throws Exception new TimestampSpec("timestamp", "iso", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null), "\t", - ImmutableList.of("timestamp", "product", "tags") + ImmutableList.of("timestamp", "product", "tags"), + false ), "UTF-8" ); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 6f5297684919..3289836d9553 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -291,7 +291,8 @@ public static IncrementalIndex loadIncrementalIndex( new DimensionsSpec(DIMENSION_SCHEMAS, null, null), "\t", "\u0001", - Arrays.asList(COLUMNS) + Arrays.asList(COLUMNS), + false ) , "utf8" ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java index 53a586ce4f28..c50367a796ba 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java @@ -162,6 +162,12 @@ public ReplayableFirehose(InputRowParser parser) throws IOException while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) { try { InputRow row = delegateFirehose.nextRow(); + + // If a header is present in the data (and with proper configurations), a null InputRow will get created + if (row == null) { + continue; + } + generator.writeObject(row); dimensionScratch.addAll(row.getDimensions()); counter++; diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index ea8b5590701a..1e295721a820 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -108,7 +108,8 @@ private void createTestIndex(File segmentDir) throws Exception new TimestampSpec("timestamp", "yyyyMMddHH", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null), null, - ImmutableList.of("timestamp", "host", "visited") + ImmutableList.of("timestamp", "host", "visited"), + false ), Charsets.UTF_8.toString() );