Add support for headers and skipping thereof for CSV and TSV#4254
Add support for headers and skipping thereof for CSV and TSV#4254jon-wei merged 22 commits intoapache:masterfrom
Conversation
gianm
left a comment
There was a problem hiding this comment.
@jihoonson, in addition to the comments, could you please merge the changes from #4198 into this PR as well? They seem related and there'd be conflicts between the two patches otherwise. I spoke with @fjy offline and he would close #4198 if you did merge that patch into this one.
| { | ||
| private final String listDelimiter; | ||
| private final List<String> columns; | ||
| private final Integer maxNumSkipHeadRows; |
There was a problem hiding this comment.
Let's make this an int and default to 0 (which Jackson would do anyway)
There was a problem hiding this comment.
Similar comment on other parse specs.
| @JsonProperty("listDelimiter") String listDelimiter, | ||
| @JsonProperty("columns") List<String> columns | ||
| @JsonProperty("columns") List<String> columns, | ||
| @JsonProperty("maxNumSkipHeadRows") Integer maxNumSkipHeadRows |
There was a problem hiding this comment.
How about skipHeaderRows? (Similar to the naming of hasHeaderRow of #4198).
| this.lineIterators = lineIterators; | ||
| this.parser = parser; | ||
| final ParseSpec parseSpec = parser.getParseSpec(); | ||
| if (parseSpec instanceof CSVParseSpec) { |
There was a problem hiding this comment.
Hmm, let's think of a way to do this without instanceof.
In #4198 the strategy is for the parser to return null for rows we want to skip. Would that work here too?
There was a problem hiding this comment.
Hmm. Returning nulls looks to affect to the query result in Druid. If so, I think returning nulls and skipping some rows are different. For example, a query for counting all rows will return different results.
There was a problem hiding this comment.
Ah, I see.. But I'm still not sure that's a better way. I prefer to not use nulls for representing something meaningful because it is difficult to check that a null value is an expected result or an error. Is it for sharing some common code paths?
There was a problem hiding this comment.
I think the rationale was that we want to avoid instanceof (since it's brittle) and we want to avoid changing the signature of the Parser.parse method (since it's in druid-api and those interfaces are meant to be stable). So returning nulls to signal "skip me" accomplishes those.
I think it's ok for a Parser to behave that way, since it's supposed to throw ParseException if there was really a parse error. Returning null to signal "skip me" seems reasonable.
I'm definitely open to other suggestions though. If you think it's best to change Parser.parse we could perhaps deprecate it and add a new method that returns something like ParseResult.
There was a problem hiding this comment.
Ah, yeah. I was thinking that ParseSpec should describe the behavior of the parser which should be consistent no matter what the type of index task is. It would be more intuitive and reduce user mistakes. Even though ParseSpec is in druid-api, I think we need to support a consistent behavior if it is provided by default. What do you think?
There was a problem hiding this comment.
Is your idea that an indexing mechanism that doesn't support hasHeaderRow / skipHeaderRows would just ignore them completely? Maybe we could achieve that by having a method like parser.startFile() that FileIteratingFirehose calls, but none of the other Parser callers call. And if it's not called, the Parsers would ignore those header row related fields.
Or were you thinking of a different kind of smarter behavior?
There was a problem hiding this comment.
Well, I thought we can throw an error, or at least we need to leave a log that hasHeaderRow / skipHeaderRows would be ignored so that users can figure out what happened by reading their log.
There was a problem hiding this comment.
If we do the parser.startFile() thing, then I suppose a Parser could throw an error if it has hasHeaderRow / skipHeaderRows set, and has its parse() method called without an earlier startFile() call. Since that would mean it's being used in "stream mode" with parameters that only make sense for files.
There was a problem hiding this comment.
Ah I see. It sounds good. I'll update this patch soon.
Thanks!
| */ | ||
| public class FileIteratingFirehose implements Firehose | ||
| { | ||
| private static final int DEFAULT_NUM_SKIP_HEAD_ROWS = 0; |
There was a problem hiding this comment.
This shouldn't be necessary if the parsers default to skip 0.
| ``` | ||
|
|
||
| The `columns` field must match the columns of your input data in the same order. | ||
| Additionally, you can set the number of head rows to be skipped to `maxNumSkipHeadRows`. Note that this option is effective |
There was a problem hiding this comment.
I would go further and say it's only effective for the non-Hadoop batch "index" task. Because it's also not useful for non-Hadoop realtime tasks; they wouldn't use FileIteratingFirehose.
There was a problem hiding this comment.
Right. I updated the comment.
|
@jihoonson, yeah, code reviews are OK and just some doc changes were needed. |
| Instead, you can set the `hasHeaderRow` field to true, which makes Druid automatically extract the column information from the header. | ||
| Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. | ||
|
|
||
| Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. Note that `hasHeaderRow` and `skipHeaderRows` are effective |
There was a problem hiding this comment.
Please include a couple clarifications.
- What happens if you provide these on a task where they aren't supported?
- If you provide both skipHeaderRows and hasHeaderRow, which is applied first? Skip first, then read a header row? Or read a header row and then skip?
Docs look good to me other than that.
There was a problem hiding this comment.
Good point. I updated the doc.
| } | ||
|
|
||
| @JsonProperty("skipHeaderRows") | ||
| public Integer getSkipHeaderRows() |
| * Parse a String into a Map. | ||
| * Initialize this parser for centralized batch processing of files like IndexTask. | ||
| */ | ||
| default void startFileFromBeginning() |
There was a problem hiding this comment.
Do we really need both this and reset()? I would think that we can get rid of reset() and instead, call startFileFromBeginning() every time a new file starts. imo, reset isn't used for anything else so it's needless to keep it.
| this.parseSpec = parseSpec; | ||
| this.mapParser = new MapInputRowParser(parseSpec); | ||
| this.parser = parseSpec.makeParser(); | ||
| parser.startFileFromBeginning(); |
There was a problem hiding this comment.
StringInputRowParser is used by parser options that aren't file-oriented (you can use it on streams etc) so this isn't a good place to put this. imo, this should replace reset() and be called in places that reset() is currently called (like FileIteratingFirehose). With one addition: it needs to be called before the very first file too.
There was a problem hiding this comment.
Right. I moved to FileIteratingFirehose.
|
thx @jihoonson, just re-reviewed. |
|
@gianm thanks. I addressed your comments. |
leventov
left a comment
There was a problem hiding this comment.
👍 for design, textual comments
| public Parser<String, Object> makeParser() | ||
| { | ||
| return new CSVParser(Optional.fromNullable(listDelimiter), columns); | ||
| return new CSVParser(Optional.fromNullable(listDelimiter), columns, hasHeaderRow, skipHeaderRows); |
There was a problem hiding this comment.
(Optional comment) Note that Optional is not recommended to be used as a method/constructor parameter: http://stackoverflow.com/a/26328555/648955.
There was a problem hiding this comment.
I found some more methods which receive an Optional as its parameter like TaskLockBox.tryLock() or OverlordResource.asLeaderWith(). I think it would be better to fix these codes all together. I'll open an issue for it.
| |`keyColumn`|The name of the column containing the key|no|The first column| | ||
| |`valueColumn`|The name of the column containing the value|no|The second column| | ||
| |`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false| | ||
| |`skipHeaderRows`|Number of header rows to be skipped|no|0| |
There was a problem hiding this comment.
Just reading this doc, it's not clear how hasHeaderRow interacts with skipHeaderRows: if rows first skipped, and then skipHeaderRows+1-th row is used as the header row, or the very first row is used as the header row, and then skipHeaderRows rows are skipped.
There was a problem hiding this comment.
Ok, it's explained in other place, but I think should be explained here is well
|
|
||
| |Parameter|Description|Required|Default| | ||
| |---------|-----------|--------|-------| | ||
| |`columns`|The list of columns in the csv file|yes|`null`| |
| |`delimiter`|The delimiter in the file|no|tab (`\t`)| | ||
| |`listDelimiter`|The list delimiter in the file|no| (`\u0001`)| | ||
| |`hasHeaderRow`|A flag to indicate that column information can be extracted from the input files' header row|no|false| | ||
| |`skipHeaderRows`|Number of header rows to be skipped|no|0| |
| Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. | ||
|
|
||
| Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set, | ||
| `skipHeaderRows` is fist applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will |
| Otherwise, you must set the `columns` field and ensure that field must match the columns of your input data in the same order. | ||
|
|
||
| Also, you can skip some header rows by setting `skipHeaderRows` in your parseSpec. If both `skipHeaderRows` and `hasHeaderRow` options are set, | ||
| `skipHeaderRows` is fist applied. For example, if you set `skipHeaderRows` to 2 and `hasHeaderRow` to true, Druid will |
|
@leventov thanks for your review. I addressed your comments. |
jon-wei
left a comment
There was a problem hiding this comment.
Looks good, +1 on design review
| @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, | ||
| @JsonProperty("listDelimiter") String listDelimiter, | ||
| @JsonProperty("columns") List<String> columns | ||
| @JsonProperty("columns") List<String> columns, |
There was a problem hiding this comment.
Is it backwards compatible to update to add fields to Json form? Maybe need to add legacy constructor without those fields? Also it is used in tranquility: https://github.com/druid-io/tranquility/blob/63eb64e4cf96e62abdb9e784ce7b07c90f83ebb4/server/src/test/scala/com/metamx/tranquility/server/TranquilityServletTest.scala#L277
There was a problem hiding this comment.
Just double checked this, an old-format JSON spec that doesn't have the new fields deserializes fine (gets hasHeaderRow=false, skipHeaderRows=0).
Looks like this does need a legacy constructor though.
This pr is similar to #4198, but different. With this pr, users can simply skip head rows rather than guessing the schema from the head rows. It's still useful when some log formats start with comment lines like https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#LogFileFormat. The
maxNumSkipHeadRowsis effective for only non-Hadoop index tasks.This change is