Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions api/src/main/java/io/druid/data/input/Firehose.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.data.input;

import javax.annotation.Nullable;
import java.io.Closeable;

/**
Expand Down Expand Up @@ -46,14 +47,16 @@ public interface Firehose extends Closeable
*
* @return true if and when there is another row available, false if the stream has dried up
*/
public boolean hasMore();
boolean hasMore();

/**
* The next row available. Should only be called if hasMore returns true.
* The return value can be null which means the caller must skip this row.
*
* @return The next row
*/
public InputRow nextRow();
@Nullable
InputRow nextRow();

/**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
Expand All @@ -74,5 +77,5 @@ public interface Firehose extends Closeable
* because of InputRows delivered by prior calls to ##nextRow().
* </p>
*/
public Runnable commit();
Runnable commit();
}
49 changes: 36 additions & 13 deletions api/src/main/java/io/druid/data/input/impl/CSVParseSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;

import io.druid.java.util.common.parsers.CSVParser;
import io.druid.java.util.common.parsers.Parser;

Expand All @@ -35,26 +34,38 @@ public class CSVParseSpec extends ParseSpec
{
private final String listDelimiter;
private final List<String> columns;
private final boolean hasHeaderRow;
private final int skipHeaderRows;

@JsonCreator
public CSVParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("columns") List<String> columns
@JsonProperty("columns") List<String> columns,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it backwards compatible to update to add fields to Json form? Maybe need to add legacy constructor without those fields? Also it is used in tranquility: https://github.com/druid-io/tranquility/blob/63eb64e4cf96e62abdb9e784ce7b07c90f83ebb4/server/src/test/scala/com/metamx/tranquility/server/TranquilityServletTest.scala#L277

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checked this, an old-format JSON spec that doesn't have the new fields deserializes fine (gets hasHeaderRow=false, skipHeaderRows=0).

Looks like this does need a legacy constructor though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the check! I raised #4388.

@JsonProperty("hasHeaderRow") boolean hasHeaderRow,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(timestampSpec, dimensionsSpec);

this.listDelimiter = listDelimiter;
Preconditions.checkNotNull(columns, "columns");
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}

this.columns = columns;

verify(dimensionsSpec.getDimensionNames());
this.hasHeaderRow = hasHeaderRow;
this.skipHeaderRows = skipHeaderRows;

if (columns != null) {
for (String column : columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}

@JsonProperty
Expand All @@ -69,6 +80,18 @@ public List<String> getColumns()
return columns;
}

@JsonProperty
public boolean isHasHeaderRow()
{
return hasHeaderRow;
}

@JsonProperty("skipHeaderRows")
public int getSkipHeaderRows()
{
return skipHeaderRows;
}

@Override
public void verify(List<String> usedCols)
{
Expand All @@ -80,23 +103,23 @@ public void verify(List<String> usedCols)
@Override
public Parser<String, Object> makeParser()
{
return new CSVParser(Optional.fromNullable(listDelimiter), columns);
return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow, skipHeaderRows);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Optional comment) Note that Optional is not recommended to be used as a method/constructor parameter: http://stackoverflow.com/a/26328555/648955.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found some more methods which receive an Optional as its parameter like TaskLockBox.tryLock() or OverlordResource.asLeaderWith(). I think it would be better to fix these codes all together. I'll open an issue for it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue: #4275

}

@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns);
return new CSVParseSpec(spec, getDimensionsSpec(), listDelimiter, columns, hasHeaderRow, skipHeaderRows);
}

@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns);
return new CSVParseSpec(getTimestampSpec(), spec, listDelimiter, columns, hasHeaderRow, skipHeaderRows);
}

public ParseSpec withColumns(List<String> cols)
{
return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols);
return new CSVParseSpec(getTimestampSpec(), getDimensionsSpec(), listDelimiter, cols, hasHeaderRow, skipHeaderRows);
}
}
97 changes: 81 additions & 16 deletions api/src/main/java/io/druid/data/input/impl/DelimitedParseSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;

import io.druid.java.util.common.parsers.DelimitedParser;
import io.druid.java.util.common.parsers.Parser;

Expand All @@ -36,27 +35,40 @@ public class DelimitedParseSpec extends ParseSpec
private final String delimiter;
private final String listDelimiter;
private final List<String> columns;
private final boolean hasHeaderRow;
private final int skipHeaderRows;

@JsonCreator
public DelimitedParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("delimiter") String delimiter,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("columns") List<String> columns
@JsonProperty("columns") List<String> columns,
@JsonProperty("hasHeaderRow") boolean hasHeaderRow,
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(timestampSpec, dimensionsSpec);

this.delimiter = delimiter;
this.listDelimiter = listDelimiter;
Preconditions.checkNotNull(columns, "columns");
this.columns = columns;
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
this.hasHeaderRow = hasHeaderRow;
this.skipHeaderRows = skipHeaderRows;

if (columns != null) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
verify(dimensionsSpec.getDimensionNames());
} else {
Preconditions.checkArgument(
hasHeaderRow,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}

verify(dimensionsSpec.getDimensionNames());
}

@JsonProperty("delimiter")
Expand All @@ -77,6 +89,18 @@ public List<String> getColumns()
return columns;
}

@JsonProperty
public boolean isHasHeaderRow()
{
return hasHeaderRow;
}

@JsonProperty("skipHeaderRows")
public int getSkipHeaderRows()
{
return skipHeaderRows;
}

@Override
public void verify(List<String> usedCols)
{
Expand All @@ -88,38 +112,79 @@ public void verify(List<String> usedCols)
@Override
public Parser<String, Object> makeParser()
{
Parser<String, Object> retVal = new DelimitedParser(
return new DelimitedParser(
Optional.fromNullable(delimiter),
Optional.fromNullable(listDelimiter)
Optional.fromNullable(listDelimiter),
columns,
hasHeaderRow,
skipHeaderRows
);
retVal.setFieldNames(columns);
return retVal;
}

@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new DelimitedParseSpec(spec, getDimensionsSpec(), delimiter, listDelimiter, columns);
return new DelimitedParseSpec(
spec,
getDimensionsSpec(),
delimiter,
listDelimiter,
columns,
hasHeaderRow,
skipHeaderRows
);
}

@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new DelimitedParseSpec(getTimestampSpec(), spec, delimiter, listDelimiter, columns);
return new DelimitedParseSpec(
getTimestampSpec(),
spec,
delimiter,
listDelimiter,
columns,
hasHeaderRow,
skipHeaderRows
);
}

public ParseSpec withDelimiter(String delim)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delim, listDelimiter, columns);
return new DelimitedParseSpec(
getTimestampSpec(),
getDimensionsSpec(),
delim,
listDelimiter,
columns,
hasHeaderRow,
skipHeaderRows
);
}

public ParseSpec withListDelimiter(String delim)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, delim, columns);
return new DelimitedParseSpec(
getTimestampSpec(),
getDimensionsSpec(),
delimiter,
delim,
columns,
hasHeaderRow,
skipHeaderRows
);
}

public ParseSpec withColumns(List<String> cols)
{
return new DelimitedParseSpec(getTimestampSpec(), getDimensionsSpec(), delimiter, listDelimiter, cols);
return new DelimitedParseSpec(
getTimestampSpec(),
getDimensionsSpec(),
delimiter,
listDelimiter,
cols,
hasHeaderRow,
skipHeaderRows
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package io.druid.data.input.impl;

import com.google.common.base.Throwables;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.utils.Runnables;
import org.apache.commons.io.LineIterator;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
*/
Expand All @@ -50,7 +50,7 @@ public FileIteratingFirehose(
public boolean hasMore()
{
while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) {
lineIterator = lineIterators.next();
lineIterator = getNextLineIterator();
}

return lineIterator != null && lineIterator.hasNext();
Expand All @@ -59,21 +59,22 @@ public boolean hasMore()
@Override
public InputRow nextRow()
{
try {
if (lineIterator == null || !lineIterator.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
}
if (!hasMore()) {
throw new NoSuchElementException();
}

lineIterator = lineIterators.next();
}
return parser.parse(lineIterator.next());
}

return parser.parse(lineIterator.next());
}
catch (Exception e) {
throw Throwables.propagate(e);
private LineIterator getNextLineIterator()
{
if (lineIterator != null) {
lineIterator.close();
}

final LineIterator iterator = lineIterators.next();
parser.startFileFromBeginning();
return iterator;
}

@Override
Expand Down
Loading