Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

import java.io.File;
import java.io.IOException;

/**
Expand All @@ -44,7 +43,7 @@ public interface InputEntityReader
*/
ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();

CloseableIterator<InputRow> read(InputEntity source, File temporaryDirectory) throws IOException;
CloseableIterator<InputRow> read() throws IOException;

CloseableIterator<InputRowListPlusJson> sample(InputEntity source, File temporaryDirectory) throws IOException;
CloseableIterator<InputRowListPlusJson> sample() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;

import java.io.File;

/**
* InputFormat abstracts the file format of input data.
* It creates a {@link InputEntityReader} to read data and parse it into {@link InputRow}.
Expand All @@ -53,5 +55,5 @@ public interface InputFormat
@JsonIgnore
boolean isSplittable();

InputEntityReader createReader(InputRowSchema inputRowSchema);
InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.io.File;
import java.io.IOException;
import java.util.List;

Expand All @@ -37,9 +36,9 @@
public abstract class IntermediateRowParsingReader<T> implements InputEntityReader
{
@Override
public CloseableIterator<InputRow> read(InputEntity source, File temporaryDirectory) throws IOException
public CloseableIterator<InputRow> read() throws IOException
{
return intermediateRowIterator(source, temporaryDirectory).flatMap(row -> {
return intermediateRowIterator().flatMap(row -> {
try {
// since parseInputRows() returns a list, the below line always iterates over the list,
// which means it calls Iterator.hasNext() and Iterator.next() at least once per row.
Expand All @@ -56,10 +55,10 @@ public CloseableIterator<InputRow> read(InputEntity source, File temporaryDirect
}

@Override
public CloseableIterator<InputRowListPlusJson> sample(InputEntity source, File temporaryDirectory)
public CloseableIterator<InputRowListPlusJson> sample()
throws IOException
{
return intermediateRowIterator(source, temporaryDirectory).map(row -> {
return intermediateRowIterator().map(row -> {
final String json;
try {
json = toJson(row);
Expand All @@ -83,7 +82,7 @@ public CloseableIterator<InputRowListPlusJson> sample(InputEntity source, File t
* Creates an iterator of intermediate rows. The returned rows will be consumed by {@link #parseInputRows} and
* {@link #toJson}.
*/
protected abstract CloseableIterator<T> intermediateRowIterator(InputEntity source, File temporaryDirectory)
protected abstract CloseableIterator<T> intermediateRowIterator()
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@
public abstract class TextReader extends IntermediateRowParsingReader<String>
{
private final InputRowSchema inputRowSchema;
final InputEntity source;
final File temporaryDirectory;

public TextReader(InputRowSchema inputRowSchema)
public TextReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
}

public InputRowSchema getInputRowSchema()
Expand All @@ -50,7 +54,7 @@ public InputRowSchema getInputRowSchema()
}

@Override
public CloseableIterator<String> intermediateRowIterator(InputEntity source, File temporaryDirectory)
public CloseableIterator<String> intermediateRowIterator()
throws IOException
{
final LineIterator delegate = new LineIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;

import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -117,10 +119,12 @@ public boolean isSplittable()
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema)
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new CsvReader(
inputRowSchema,
source,
temporaryDirectory,
listDelimiter,
columns,
findColumnsFromHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.opencsv.RFC4180Parser;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader;
Expand All @@ -34,6 +35,7 @@
import org.apache.druid.java.util.common.parsers.Parsers;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -51,13 +53,15 @@ public class CsvReader extends TextReader

CsvReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable String listDelimiter,
@Nullable List<String> columns,
boolean findColumnsFromHeader,
int skipHeaderRows
)
{
super(inputRowSchema);
super(inputRowSchema, source, temporaryDirectory);
this.findColumnsFromHeader = findColumnsFromHeader;
this.skipHeaderRows = skipHeaderRows;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public CloseableIterator<InputRow> read()
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema);
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try {
return reader.read(entity, temporaryDirectory);
return reader.read();
}
catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -79,9 +79,9 @@ public CloseableIterator<InputRowListPlusJson> sample()
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema);
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try {
return reader.sample(entity, temporaryDirectory);
return reader.sample();
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;

import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -66,9 +68,9 @@ public boolean isSplittable()
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema)
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new JsonReader(inputRowSchema, getFlattenSpec(), objectMapper);
return new JsonReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), objectMapper);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.TextReader;
Expand All @@ -30,6 +31,7 @@
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -40,9 +42,15 @@ public class JsonReader extends TextReader
private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper;

JsonReader(InputRowSchema inputRowSchema, JSONPathSpec flattenSpec, ObjectMapper mapper)
JsonReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
JSONPathSpec flattenSpec,
ObjectMapper mapper
)
{
super(inputRowSchema);
super(inputRowSchema, source, temporaryDirectory);
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker());
this.mapper = mapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public void testMultiValues() throws IOException
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of(), "|", true, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) {
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
Expand Down Expand Up @@ -216,10 +216,12 @@ public void testQuotes() throws IOException
new TimestampSpec("Timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("Timestamp"))),
Collections.emptyList()
)
),
source,
null
);

try (CloseableIterator<InputRow> iterator = reader.read(source, null)) {
try (CloseableIterator<InputRow> iterator = reader.read()) {
final Iterator<InputRow> expectedRowIterator = expectedResults.iterator();
while (iterator.hasNext()) {
Assert.assertTrue(expectedRowIterator.hasNext());
Expand All @@ -237,8 +239,8 @@ public void testRussianTextMess() throws IOException
)
);
final CsvInputFormat format = new CsvInputFormat(ImmutableList.of("ts", "name", "Comment"), null, false, 0);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA);
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) {
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2019-01-01T00:00:10Z"), row.getTimestamp());
Expand Down Expand Up @@ -267,9 +269,9 @@ private ByteEntity writeData(List<String> lines) throws IOException

private void assertResult(ByteEntity source, CsvInputFormat format) throws IOException
{
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA);
final InputEntityReader reader = format.createReader(INPUT_ROW_SCHEMA, source, null);
int numResults = 0;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) {
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ public void testParseRow() throws IOException
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
)
),
source,
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) {
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Expand Down Expand Up @@ -112,11 +114,13 @@ public void testParseRowWithConditional() throws IOException
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
Collections.emptyList()
)
),
source,
null
);

final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read(source, null)) {
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.druid.data.input.impl;

import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;

import java.io.File;

public class NoopInputFormat implements InputFormat
{
@Override
Expand All @@ -32,7 +35,7 @@ public boolean isSplittable()
}

@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema)
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
throw new UnsupportedOperationException();
}
Expand Down