Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

/**
* InputEntity abstracts an input entity and knows how to read bytes from the given entity.
* Since the implementations of this interface assume that the given entity is not empty, the InputSources
* should not create InputEntities for empty entities.
*/
@UnstableApi
public interface InputEntity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;

import javax.annotation.Nullable;
import java.util.ArrayList;
Expand Down Expand Up @@ -61,14 +62,18 @@ public long getMaxSplitSize()
@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
{
final Iterator<T> nonEmptyFileOnlyIterator = Iterators.filter(
inputIterator,
input -> inputAttributeExtractor.apply(input).getSize() > 0
);
return new Iterator<List<T>>()
{
private T peeking;

@Override
public boolean hasNext()
{
return peeking != null || inputIterator.hasNext();
return peeking != null || nonEmptyFileOnlyIterator.hasNext();
}

@Override
Expand All @@ -79,9 +84,9 @@ public List<T> next()
}
final List<T> current = new ArrayList<>();
long splitSize = 0;
while (splitSize < maxSplitSize && (peeking != null || inputIterator.hasNext())) {
while (splitSize < maxSplitSize && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) {
if (peeking == null) {
peeking = inputIterator.next();
peeking = nonEmptyFileOnlyIterator.next();
}
final long size = inputAttributeExtractor.apply(peeking).getSize();
if (current.isEmpty() || splitSize + size < maxSplitSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
Expand All @@ -38,6 +39,7 @@ public class InlineInputSource extends AbstractInputSource
@JsonCreator
public InlineInputSource(@JsonProperty("data") String data)
{
Preconditions.checkArgument(data != null && !data.isEmpty(), "empty data");
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,14 @@ private Iterator<List<File>> getSplitFileIterator(SplitHintSpec splitHintSpec)
@VisibleForTesting
Iterator<File> getFileIterator()
{
return Iterators.concat(
getDirectoryListingIterator(),
getFilesListIterator()
);
return
Iterators.filter(
Iterators.concat(
getDirectoryListingIterator(),
getFilesListIterator()
),
file -> file.length() > 0
);
}

private Iterator<File> getDirectoryListingIterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.apache.druid.data.input;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.compress.utils.Lists;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -79,6 +79,29 @@ public void testSplitLargeInputsReturningSplitsOfSingleInput()
}
}

@Test
public void testSplitSkippingEmptyInputs()
{
final int nonEmptyInputSize = 3;
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L);
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
final IntStream dataStream = IntStream.concat(
IntStream.concat(
IntStream.generate(() -> 0).limit(10),
IntStream.generate(() -> nonEmptyInputSize).limit(10)
),
IntStream.generate(() -> 0).limit(10)
);
final List<List<Integer>> splits = Lists.newArrayList(
splitHintSpec.split(dataStream.iterator(), inputAttributeExtractor)
);
Assert.assertEquals(4, splits.size());
Assert.assertEquals(3, splits.get(0).size());
Assert.assertEquals(3, splits.get(1).size());
Assert.assertEquals(3, splits.get(2).size());
Assert.assertEquals(1, splits.get(3).size());
}

@Test
public void testEquals()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
Expand All @@ -33,6 +34,9 @@

import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -66,7 +70,7 @@ public void testCreateSplitsRespectingSplitHintSpec()
{
final long fileSize = 15;
final long maxSplitSize = 50;
final Set<File> files = prepareFiles(10, fileSize);
final Set<File> files = mockFiles(10, fileSize);
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
final List<InputSplit<List<File>>> splits = inputSource
.createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize))
Expand All @@ -83,7 +87,7 @@ public void testEstimateNumSplitsRespectingSplitHintSpec()
{
final long fileSize = 13;
final long maxSplitSize = 40;
final Set<File> files = prepareFiles(10, fileSize);
final Set<File> files = mockFiles(10, fileSize);
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
Assert.assertEquals(
4,
Expand All @@ -97,11 +101,19 @@ public void testGetFileIteratorWithBothBaseDirAndDuplicateFilesIteratingFilesOnl
File baseDir = temporaryFolder.newFolder();
List<File> filesInBaseDir = new ArrayList<>();
for (int i = 0; i < 10; i++) {
filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
final File file = File.createTempFile("local-input-source", ".data", baseDir);
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
writer.write("test");
}
filesInBaseDir.add(file);
}
Set<File> files = new HashSet<>(filesInBaseDir.subList(0, 5));
for (int i = 0; i < 3; i++) {
files.add(File.createTempFile("local-input-source", ".data", baseDir));
final File file = File.createTempFile("local-input-source", ".data", baseDir);
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
writer.write("test");
}
files.add(file);
}
Set<File> expectedFiles = new HashSet<>(filesInBaseDir);
expectedFiles.addAll(files);
Expand All @@ -117,7 +129,11 @@ public void testGetFileIteratorWithOnlyBaseDirIteratingAllFiles() throws IOExcep
File baseDir = temporaryFolder.newFolder();
Set<File> filesInBaseDir = new HashSet<>();
for (int i = 0; i < 10; i++) {
filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
final File file = File.createTempFile("local-input-source", ".data", baseDir);
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
writer.write("test");
}
filesInBaseDir.add(file);
}
Iterator<File> fileIterator = new LocalInputSource(baseDir, "*", null).getFileIterator();
Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
Expand All @@ -130,14 +146,28 @@ public void testGetFileIteratorWithOnlyFilesIteratingAllFiles() throws IOExcepti
File baseDir = temporaryFolder.newFolder();
Set<File> filesInBaseDir = new HashSet<>();
for (int i = 0; i < 10; i++) {
filesInBaseDir.add(File.createTempFile("local-input-source", ".data", baseDir));
final File file = File.createTempFile("local-input-source", ".data", baseDir);
try (Writer writer = Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
writer.write("test");
}
filesInBaseDir.add(file);
}
Iterator<File> fileIterator = new LocalInputSource(null, null, filesInBaseDir).getFileIterator();
Set<File> actualFiles = Streams.sequentialStreamFrom(fileIterator).collect(Collectors.toSet());
Assert.assertEquals(filesInBaseDir, actualFiles);
}

private static Set<File> prepareFiles(int numFiles, long fileSize)
@Test
public void testFileIteratorWithEmptyFilesIteratingNonEmptyFilesOnly()
{
final Set<File> files = new HashSet<>(mockFiles(10, 5));
files.addAll(mockFiles(10, 0));
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
List<File> iteratedFiles = Lists.newArrayList(inputSource.getFileIterator());
Assert.assertTrue(iteratedFiles.stream().allMatch(file -> file.length() > 0));
}

private static Set<File> mockFiles(int numFiles, long fileSize)
{
final Set<File> files = new HashSet<>();
for (int i = 0; i < numFiles; i++) {
Expand Down
Loading