Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;

import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
Expand All @@ -38,14 +37,12 @@
public abstract class TextReader extends IntermediateRowParsingReader<String>
{
private final InputRowSchema inputRowSchema;
final InputEntity source;
final File temporaryDirectory;
private final InputEntity source;

public TextReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
public TextReader(InputRowSchema inputRowSchema, InputEntity source)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
}

public InputRowSchema getInputRowSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,26 @@
package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import com.opencsv.enums.CSVReaderNullFieldIndicator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;

import javax.annotation.Nullable;
import java.io.File;
import java.util.Arrays;
import java.util.List;

public class CsvInputFormat extends DelimitedInputFormat
public class CsvInputFormat extends FlatTextInputFormat
{
private static final char SEPARATOR = ',';
private static final RFC4180Parser PARSER = createOpenCsvParser();

@JsonCreator
public CsvInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
Expand All @@ -36,6 +49,42 @@ public CsvInputFormat(
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
super(columns, listDelimiter, ",", hasHeaderRow, findColumnsFromHeader, skipHeaderRows);
super(columns, listDelimiter, String.valueOf(SEPARATOR), hasHeaderRow, findColumnsFromHeader, skipHeaderRows);
}

@Override
@JsonIgnore
public String getDelimiter()
{
return super.getDelimiter();
}

@Override
public boolean isSplittable()
{
return true;
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new DelimitedValueReader(
inputRowSchema,
source,
getListDelimiter(),
getColumns(),
isFindColumnsFromHeader(),
getSkipHeaderRows(),
line -> Arrays.asList(PARSER.parseLine(line))
);
}

public static RFC4180Parser createOpenCsvParser()
{
return NullHandling.replaceWithDefault()
? new RFC4180ParserBuilder().withSeparator(SEPARATOR).build()
: new RFC4180ParserBuilder().withFieldAsNull(CSVReaderNullFieldIndicator.EMPTY_SEPARATORS)
.withSeparator(SEPARATOR)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,72 +19,30 @@

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Splitter;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;

import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

/**
* InputFormat for customized Delimitor Separate Value format of input data(default is TSV).
*/
public class DelimitedInputFormat implements InputFormat
public class DelimitedInputFormat extends FlatTextInputFormat
{
private static final String DEFAULT_DELIMITER = "\t";

public enum Format
{
CSV(',', "comma"),
TSV('\t', "tab"),
CustomizeSV('|', "");

private char delimiter;
private String literal;

Format(char delimiter, String literal)
{
this.delimiter = delimiter;
this.literal = literal;
}

public String getDelimiterAsString()
{
return String.valueOf(delimiter);
}

private void setDelimiter(String delimiter, String literal)
{
this.delimiter = (delimiter != null && delimiter.length() > 0) ? delimiter.charAt(0) : '\t';
this.literal = literal != null ? literal : "customize separator: " + delimiter;
}

public char getDelimiter()
{
return delimiter;
}

public String getLiteral()
{
return literal;
}
}

private final String listDelimiter;
private final List<String> columns;
private final boolean findColumnsFromHeader;
private final int skipHeaderRows;
private final Format format;
private final String delimiter;

@JsonCreator
public DelimitedInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
@JsonProperty("listDelimiter") @Nullable String listDelimiter,
Expand All @@ -94,79 +52,14 @@ public DelimitedInputFormat(
@JsonProperty("skipHeaderRows") int skipHeaderRows
)
{
this.listDelimiter = listDelimiter;
this.columns = columns == null ? Collections.emptyList() : columns;
//noinspection ConstantConditions
this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty(
ImmutableList.of(
new Property<>("hasHeaderRow", hasHeaderRow),
new Property<>("findColumnsFromHeader", findColumnsFromHeader)
)
).getValue();
this.skipHeaderRows = skipHeaderRows;
this.delimiter = delimiter == null ? "\t" : delimiter;
this.format = getFormat(this.delimiter);
Preconditions.checkArgument(
this.delimiter.length() == 1,
"The delimiter should be a single character"
);
Preconditions.checkArgument(
!this.delimiter.equals(listDelimiter),
"Cannot have same delimiter and list delimiter of [%s]",
this.delimiter
super(
columns,
listDelimiter,
delimiter == null ? DEFAULT_DELIMITER : delimiter,
hasHeaderRow,
findColumnsFromHeader,
skipHeaderRows
);
if (!this.columns.isEmpty()) {
for (String column : this.columns) {
Preconditions.checkArgument(
!column.contains(format.getDelimiterAsString()),
"Column[%s] has a " + format.getLiteral() + ", it cannot",
column
);
}
} else {
Preconditions.checkArgument(
this.findColumnsFromHeader,
"If columns field is not set, the first row of your data must have your header"
+ " and hasHeaderRow must be set to true."
);
}
}

private static Format getFormat(String delimiter)
{
if (",".equals(delimiter)) {
return Format.CSV;
} else if ("\t".equals(delimiter)) {
return Format.TSV;
} else {
Format.CustomizeSV.setDelimiter(delimiter, null);
return Format.CustomizeSV;
}
}


@JsonProperty
public List<String> getColumns()
{
return columns;
}

@JsonProperty
public String getListDelimiter()
{
return listDelimiter;
}

@JsonProperty
public boolean isFindColumnsFromHeader()
{
return findColumnsFromHeader;
}

@JsonProperty
public int getSkipHeaderRows()
{
return skipHeaderRows;
}

@Override
Expand All @@ -181,35 +74,36 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
return new DelimitedValueReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
skipHeaderRows,
this.format
getListDelimiter(),
getColumns(),
isFindColumnsFromHeader(),
getSkipHeaderRows(),
line -> splitToList(Splitter.on(getDelimiter()), line)
);
}

@Override
public boolean equals(Object o)
/**
* Copied from Guava's {@link Splitter#splitToList(CharSequence)}.
* This is to avoid the error of the missing method signature when using an old Guava library.
* For example, it may happen when running Druid Hadoop indexing jobs, since we may inherit the version provided by
* the Hadoop cluster. See https://github.com/apache/druid/issues/6801.
*/
public static List<String> splitToList(Splitter splitter, String input)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
Preconditions.checkNotNull(input);

Iterator<String> iterator = splitter.split(input).iterator();
List<String> result = new ArrayList<>();

while (iterator.hasNext()) {
String splitValue = iterator.next();
if (!NullHandling.replaceWithDefault() && splitValue.isEmpty()) {
result.add(null);
} else {
result.add(splitValue);
}
}
DelimitedInputFormat format = (DelimitedInputFormat) o;
return findColumnsFromHeader == format.findColumnsFromHeader &&
skipHeaderRows == format.skipHeaderRows &&
Objects.equals(listDelimiter, format.listDelimiter) &&
Objects.equals(columns, format.columns) &&
Objects.equals(this.format, format.format);
}

@Override
public int hashCode()
{
return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows, format);
return Collections.unmodifiableList(result);
}
}
Loading