Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,25 @@
package io.druid.indexer.hadoop;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import io.druid.collections.CountingMap;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
Expand All @@ -36,9 +47,13 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;

public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
Expand Down Expand Up @@ -89,9 +104,11 @@ public int compare(WindowedDataSegment s1, WindowedDataSegment s2)
List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;

JobConf dummyConf = new JobConf();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this supposed to work? The first line of this method says:

Configuration conf = context.getConfiguration();

However, the values in this configuration are ignored, and a blank "dummy" config is passed on instead. This causes exceptions like described in #5135

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, you're right . rewriting it to use org.apache.hadoop.mapreduce.lib.input.FileInputFormat and supplied JobContext in getSplits(JobContext) arg would probably solve #5135 . Please feel free to submit a PR with patch that does so after its verification.

org.apache.hadoop.mapred.InputFormat fio = supplier.get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both these lines can be moved inside toDataSourceSplit() itself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobConf is rather a big object and I wanted to reuse it if possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, dint realize it was used twice.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobConf is rather a big object and I wanted to reuse it if possible.

How come is it a big object? You literally create it completely empty.

for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(new DatasourceInputSplit(list));
splits.add(toDataSourceSplit(list, fio, dummyConf));
list = Lists.newArrayList();
size = 0;
}
Expand All @@ -101,7 +118,7 @@ public int compare(WindowedDataSegment s1, WindowedDataSegment s2)
}

if (list.size() > 0) {
splits.add(new DatasourceInputSplit(list));
splits.add(toDataSourceSplit(list, fio, dummyConf));
}

logger.info("Number of splits [%d]", splits.size());
Expand All @@ -116,4 +133,85 @@ public RecordReader<NullWritable, InputRow> createRecordReader(
{
return new DatasourceRecordReader();
}

private Supplier<org.apache.hadoop.mapred.InputFormat> supplier = new Supplier<org.apache.hadoop.mapred.InputFormat>()
{
@Override
public org.apache.hadoop.mapred.InputFormat get()
{
return new TextInputFormat();
}
};

@VisibleForTesting
DatasourceInputFormat setSupplier(Supplier<org.apache.hadoop.mapred.InputFormat> supplier) {
this.supplier = supplier;
return this;
}

private DatasourceInputSplit toDataSourceSplit(
List<WindowedDataSegment> segments,
org.apache.hadoop.mapred.InputFormat fio,
JobConf conf
)
{
String[] locations = null;
try {
locations = getFrequentLocations(segments, fio, conf);
}
catch (Exception e) {
logger.error("Exception thrown finding location of splits", e);
}
return new DatasourceInputSplit(segments, locations);
}

private String[] getFrequentLocations(
List<WindowedDataSegment> segments,
org.apache.hadoop.mapred.InputFormat fio,
JobConf conf
) throws IOException
{
Iterable<String> locations = Collections.emptyList();
for (WindowedDataSegment segment : segments) {
FileInputFormat.setInputPaths(conf, new Path(JobHelper.getURIFromSegment(segment.getSegment())));
for (org.apache.hadoop.mapred.InputSplit split : fio.getSplits(conf, 1)) {
locations = Iterables.concat(locations, Arrays.asList(split.getLocations()));
}
}
return getFrequentLocations(locations);
}

private static String[] getFrequentLocations(Iterable<String> hosts) {

final CountingMap<String> counter = new CountingMap<>();
for (String location : hosts) {
counter.add(location, 1);
}

final TreeSet<Pair<Long, String>> sorted = Sets.<Pair<Long, String>>newTreeSet(
new Comparator<Pair<Long, String>>()
{
@Override
public int compare(Pair<Long, String> o1, Pair<Long, String> o2)
{
int compare = o2.lhs.compareTo(o1.lhs); // descending
if (compare == 0) {
compare = o1.rhs.compareTo(o2.rhs); // ascending
}
return compare;
}
}
);

for (Map.Entry<String, AtomicLong> entry : counter.entrySet()) {
sorted.add(Pair.of(entry.getValue().get(), entry.getKey()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using TreeMap instead of TreeSet would avoid Pair creation.

}

// use default replication factor, if possible
final List<String> locations = Lists.newArrayListWithCapacity(3);
for (Pair<Long, String> frequent : Iterables.limit(sorted, 3)) {
locations.add(frequent.rhs);
}
return locations.toArray(new String[locations.size()]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@

public class DatasourceInputSplit extends InputSplit implements Writable
{
private static final String[] EMPTY_STR_ARRAY = new String[0];

private List<WindowedDataSegment> segments = null;
private String[] locations = null;

//required for deserialization
public DatasourceInputSplit()
{
}

public DatasourceInputSplit(@NotNull List<WindowedDataSegment> segments)
public DatasourceInputSplit(@NotNull List<WindowedDataSegment> segments, String[] locations)
{
Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments");
this.segments = segments;
this.locations = locations == null ? EMPTY_STR_ARRAY : locations;
}

@Override
Expand All @@ -59,7 +63,7 @@ public long getLength() throws IOException, InterruptedException
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
return locations;
}

public List<WindowedDataSegment> getSegments()
Expand All @@ -71,6 +75,10 @@ public List<WindowedDataSegment> getSegments()
public void write(DataOutput out) throws IOException
{
out.writeUTF(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(segments));
out.writeInt(locations.length);
for (String location : locations) {
out.writeUTF(location);
}
}

@Override
Expand All @@ -82,5 +90,9 @@ public void readFields(DataInput in) throws IOException
{
}
);
locations = new String[in.readInt()];
for (int i = 0; i < locations.length; i++) {
locations[i] = in.readUTF();
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add/update serde tests in DatasourceInputSplitTest?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio
@Override
public InputSplit apply(DataSegment input)
{
return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)));
return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)), null);
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@

package io.druid.indexer.hadoop;

import com.google.api.client.util.Maps;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexer.JobHelper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.easymock.EasyMock;
Expand All @@ -34,13 +45,17 @@
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
*/
public class DatasourceInputFormatTest
{
private List<WindowedDataSegment> segments;
private List<LocatedFileStatus> locations;
private Configuration config;
private JobContext context;

Expand Down Expand Up @@ -98,6 +113,36 @@ public void setUp() throws Exception
)
);

Path path1 = new Path(JobHelper.getURIFromSegment(segments.get(0).getSegment()));
Path path2 = new Path(JobHelper.getURIFromSegment(segments.get(1).getSegment()));
Path path3 = new Path(JobHelper.getURIFromSegment(segments.get(2).getSegment()));

// dummy locations for test
locations = ImmutableList.of(
new LocatedFileStatus(
1000, false, 0, 0, 0, 0, null, null, null, null, path1,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 600),
new BlockLocation(null, new String[]{"s2", "s3"}, 600, 400)
}
),
new LocatedFileStatus(
4000, false, 0, 0, 0, 0, null, null, null, null, path2,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000),
new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200),
new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100),
new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700),
}
),
new LocatedFileStatus(
500, false, 0, 0, 0, 0, null, null, null, null, path3,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
}
)
);

config = new Configuration();
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
Expand All @@ -109,47 +154,89 @@ public void setUp() throws Exception
EasyMock.replay(context);
}

private Supplier<InputFormat> testFormatter = new Supplier<InputFormat>() {
@Override
public InputFormat get()
{
final Map<String, LocatedFileStatus> locationMap = Maps.newHashMap();
for (LocatedFileStatus status : locations) {
locationMap.put(status.getPath().getName(), status);
}

return new TextInputFormat()
{
@Override
protected boolean isSplitable(FileSystem fs, Path file) {
return false;
}

@Override
protected FileStatus[] listStatus(JobConf job) throws IOException
{
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
FileStatus[] status = new FileStatus[dirs.length];
for (int i = 0; i < dirs.length; i++) {
status[i] = locationMap.get(dirs[i].getName());
}
return status;
}
};
}
};

@Test
public void testGetSplitsNoCombining() throws Exception
{
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
DatasourceInputFormat inputFormat = new DatasourceInputFormat().setSupplier(testFormatter);
List<InputSplit> splits = inputFormat.getSplits(context);

Assert.assertEquals(segments.size(), splits.size());
for (int i = 0; i < segments.size(); i++) {
Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0));
DatasourceInputSplit split = (DatasourceInputSplit) splits.get(i);
Assert.assertEquals(segments.get(i), split.getSegments().get(0));
}
Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(0).getLocations());
Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(1).getLocations());
Assert.assertArrayEquals(new String[] {"s2", "s3"}, splits.get(2).getLocations());
}

@Test
public void testGetSplitsAllCombined() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);

Assert.assertEquals(1, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);

Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations());
}

@Test
public void testGetSplitsCombineInTwo() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);

Assert.assertEquals(2, splits.size());

Assert.assertEquals(
Sets.newHashSet(segments.get(0), segments.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations());

Assert.assertEquals(
Sets.newHashSet(segments.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations());
}

@Test
Expand Down
Loading