Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;

import io.druid.java.util.common.parsers.CSVParser;
import io.druid.java.util.common.parsers.Parser;

Expand All @@ -35,26 +34,35 @@ public class CSVParseSpec extends ParseSpec
{
private final String listDelimiter;
private final List<String> columns;
private final boolean hasHeaderRow;

@JsonCreator
public CSVParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("columns") List<String> columns
@JsonProperty("columns") List<String> columns,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow
)
{
super(timestampSpec, dimensionsSpec);

this.listDelimiter = listDelimiter;
Preconditions.checkNotNull(columns, "columns");
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}

this.columns = columns;

verify(dimensionsSpec.getDimensionNames());
if (columns != null) {
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
"If columns field is not set, the first row of your data must have your header and hasHeaderRow must be set to true."
);
}

this.hasHeaderRow = hasHeaderRow;
}

@JsonProperty
Expand All @@ -69,6 +77,12 @@ public List<String> getColumns()
return columns;
}

@JsonProperty
public boolean isHasHeaderRow()
{
return hasHeaderRow;
}

@Override
public void verify(List<String> usedCols)
{
Expand All @@ -80,23 +94,23 @@ public void verify(List<String> usedCols)
@Override
public Parser<String, Object> makeParser()
{
return new CSVParser(Optional.fromNullable(listDelimiter), columns);
return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow);
}

@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns);
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow);
}

@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns);
return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow);
}

public ParseSpec withColumns(List<String> cols)
{
return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols);
return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow);
}
}
48 changes: 33 additions & 15 deletions api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;

import io.druid.java.util.common.parsers.DelimitedParser;
import io.druid.java.util.common.parsers.Parser;

Expand All @@ -36,27 +35,36 @@ public class DelimitedParseSpec extends ParseSpec
private final String delimiter;
private final String listDelimiter;
private final List<String> columns;
private final boolean hasHeaderRow;

@JsonCreator
public DelimitedParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("delimiter") String delimiter,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("columns") List<String> columns
@JsonProperty("columns") List<String> columns,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow
)
{
super(timestampSpec, dimensionsSpec);

this.delimiter = delimiter;
this.listDelimiter = listDelimiter;
Preconditions.checkNotNull(columns, "columns");
this.columns = columns;
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
if (columns != null) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
"If columns field is not set, the first row of your data must have your header and hasHeaderRow must be set to true."
);
}

verify(dimensionsSpec.getDimensionNames());
this.hasHeaderRow = hasHeaderRow;
}

@JsonProperty("delimiter")
Expand All @@ -77,6 +85,12 @@ public List<String> getColumns()
return columns;
}

@JsonProperty
public boolean isHasHeaderRow()
{
return hasHeaderRow;
}

@Override
public void verify(List<String> usedCols)
{
Expand All @@ -88,38 +102,42 @@ public void verify(List<String> usedCols)
@Override
public Parser<String, Object> makeParser()
{
Parser<String, Object> retVal = new DelimitedParser(
return new DelimitedParser(
Optional.fromNullable(delimiter),
Optional.fromNullable(listDelimiter)
Optional.fromNullable(listDelimiter),
columns,
hasHeaderRow
);
retVal.setFieldNames(columns);
return retVal;
}

@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns);
return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns, hasHeaderRow);
}

@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns);
return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns, hasHeaderRow);
}

public ParseSpec withDelimiter(String delim)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns);
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns,
hasHeaderRow
);
}

public ParseSpec withListDelimiter(String delim)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns);
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns, hasHeaderRow);
}

public ParseSpec withColumns(List<String> cols)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols);
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols,
hasHeaderRow
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ public FileIteratingFirehose(
public boolean hasMore()
{
while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
}

lineIterator = lineIterators.next();
parser.reset();
}

return lineIterator != null && lineIterator.hasNext();
Expand All @@ -67,6 +73,7 @@ public InputRow nextRow()
}

lineIterator = lineIterators.next();
parser.reset();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reset the parser at here too. Also, lineIterator must be closed before assigning new one there.

}

return parser.parse(lineIterator.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;

import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.parsers.ParseException;
Expand Down Expand Up @@ -124,18 +123,27 @@ private Map<String, Object> buildStringKeyMap(ByteBuffer input)
return theMap;
}

private Map<String, Object> parseString(String inputString)
public void reset()
{
return parser.parse(inputString);
parser.reset();
}

public InputRow parse(String input)
{
return parseMap(parseString(input));
}

private Map<String, Object> parseString(String inputString)
{
return parser.parse(inputString);
}

private InputRow parseMap(Map<String, Object> theMap)
{
// If a header is present in the data (and with proper configurations), a null is returned
if (theMap == null) {
return null;
}
return mapParser.parse(theMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import io.druid.java.util.common.parsers.Parser;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public void testColumnMissing() throws Exception
Lists.<SpatialDimensionSchema>newArrayList()
),
",",
Arrays.asList("a")
Arrays.asList("a"),
false
);
}

Expand All @@ -60,7 +61,8 @@ public void testComma() throws Exception
Lists.<SpatialDimensionSchema>newArrayList()
),
",",
Arrays.asList("a")
Arrays.asList("a"),
false
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testSerde() throws IOException
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("abc")), null, null),
"\u0001",
"\u0002",
Arrays.asList("abc")
Arrays.asList("abc"),
false
);
final DelimitedParseSpec serde = jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
Expand Down Expand Up @@ -71,7 +72,8 @@ public void testColumnMissing() throws Exception
),
",",
" ",
Arrays.asList("a")
Arrays.asList("a"),
false
);
}

Expand All @@ -91,12 +93,14 @@ public void testComma() throws Exception
),
",",
null,
Arrays.asList("a")
Arrays.asList("a"),
false
);
}

@Test(expected = NullPointerException.class)
public void testDefaultColumnList(){
@Test(expected = IllegalArgumentException.class)
public void testDefaultColumnList()
{
final DelimitedParseSpec spec = new DelimitedParseSpec(
new TimestampSpec(
"timestamp",
Expand All @@ -110,8 +114,8 @@ public void testDefaultColumnList(){
),
",",
null,
// pass null columns not allowed
null
null,
false
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public LineIterator apply(String s)
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("x")), null, null),
",",
ImmutableList.of("ts", "x")
ImmutableList.of("ts", "x"),
false
),
null
);
Expand Down
9 changes: 6 additions & 3 deletions api/src/test/java/io/druid/data/input/impl/ParseSpecTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testDuplicateNames() throws Exception
),
",",
" ",
Arrays.asList("a", "b")
Arrays.asList("a", "b"),
false
);
}

Expand All @@ -65,7 +66,8 @@ public void testDimAndDimExcluOverlap() throws Exception
),
",",
null,
Arrays.asList("a", "B")
Arrays.asList("a", "B"),
false
);
}

Expand All @@ -85,7 +87,8 @@ public void testDimExclusionDuplicate() throws Exception
),
",",
null,
Arrays.asList("a", "B")
Arrays.asList("a", "B"),
false
);
}
}
Loading