Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.TsvInputFormat;
import org.apache.druid.guice.annotations.UnstableApi;

import java.io.File;
Expand All @@ -44,7 +45,8 @@
@JsonSubTypes(value = {
@Type(name = "csv", value = CsvInputFormat.class),
@Type(name = "json", value = JsonInputFormat.class),
@Type(name = "regex", value = RegexInputFormat.class)
@Type(name = "regex", value = RegexInputFormat.class),
@Type(name = "tsv", value = TsvInputFormat.class)
})
public interface InputFormat
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Parser<String, Object> makeParser()
@Override
public InputFormat toInputFormat()
{
return new CsvInputFormat(columns, listDelimiter, hasHeaderRow, skipHeaderRows);
return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,12 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;

import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class CsvInputFormat implements InputFormat
public class CsvInputFormat extends SeparateValueInputFormat
{
private final String listDelimiter;
private final List<String> columns;
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;

@JsonCreator
public CsvInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
Expand All @@ -53,104 +36,6 @@ public CsvInputFormat(
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
this.listDelimiter = listDelimiter;
this.columns = columns == null ? Collections.emptyList() : columns;
//noinspection ConstantConditions
this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("hasHeaderRow", hasHeaderRow),
new Property<>("findColumnsFromHeader", findColumnsFromHeader)
)
).getValue();
this.skipHeaderRows = skipHeaderRows;

if (!this.columns.isEmpty()) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
} else {
Preconditions.checkArgument(
this.findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}

@VisibleForTesting
public CsvInputFormat(
List<String> columns,
String listDelimiter,
boolean findColumnsFromHeader,
int skipHeaderRows
)
{
this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows);
}

@JsonProperty
public List<String> getColumns()
{
return columns;
}

@JsonProperty
public String getListDelimiter()
{
return listDelimiter;
}

@JsonProperty
public boolean isFindColumnsFromHeader()
{
return findColumnsFromHeader;
}

@JsonProperty
public int getSkipHeaderRows()
{
return skipHeaderRows;
}

@Override
public boolean isSplittable()
{
return true;
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new CsvReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows
);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CsvInputFormat format = (CsvInputFormat) o;
return findColumnsFromHeader == format.findColumnsFromHeader &&
skipHeaderRows == format.skipHeaderRows &&
Objects.equals(listDelimiter, format.listDelimiter) &&
Objects.equals(columns, format.columns);
}

@Override
public int hashCode()
{
return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows);
super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV);
}
}
108 changes: 10 additions & 98 deletions core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,15 @@

package org.apache.druid.data.input.impl;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;
import org.apache.druid.java.util.common.parsers.Parsers;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class CsvReader extends TextReader
public class CsvReader extends SeparateValueReader
{
private final RFC4180Parser parser = CsvReader.createOpenCsvParser();
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Function<String, Object> multiValueFunction;
@Nullable
private List<String> columns;

public static RFC4180Parser createOpenCsvParser()
{
return NullHandling.replaceWithDefault()
? new RFC4180Parser()
: new RFC4180ParserBuilder().withFieldAsNull(
CSVReaderNullFieldIndicator.EMPTY_SEPARATORS).build();
}

CsvReader(
InputRowSchema inputRowSchema,
InputEntity source,
Expand All @@ -72,69 +38,15 @@ public static RFC4180Parser createOpenCsvParser()
int skipHeaderRows
)
{
super(inputRowSchema, source, temporaryDirectory);
this.findColumnsFromHeader = findColumnsFromHeader;
this.skipHeaderRows = skipHeaderRows;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row

if (this.columns != null) {
for (String column : this.columns) {
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
}
} else {
Preconditions.checkArgument(
findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}

@Override
public List<InputRow> parseInputRows(String line) throws IOException, ParseException
{
final Map<String, Object> zipped = parseLine(line);
return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped));
}

@Override
public Map<String, Object> toMap(String intermediateRow) throws IOException
{
return parseLine(intermediateRow);
}

private Map<String, Object> parseLine(String line) throws IOException
{
final String[] parsed = parser.parseLine(line);
return Utils.zipMapPartial(
Preconditions.checkNotNull(columns, "columns"),
Iterables.transform(Arrays.asList(parsed), multiValueFunction)
super(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows,
SeparateValueInputFormat.Format.CSV
);
}

@Override
public int getNumHeaderLinesToSkip()
{
return skipHeaderRows;
}

@Override
public boolean needsToProcessHeaderLine()
{
return findColumnsFromHeader;
}

@Override
public void processHeaderLine(String line) throws IOException
{
if (!findColumnsFromHeader) {
throw new ISE("Don't call this if findColumnsFromHeader = false");
}
columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line)));
if (columns.isEmpty()) {
throw new ISE("Empty columns");
}
}
}
Loading