diff --git a/docs/content/ingestion/update-existing-data.md b/docs/content/ingestion/update-existing-data.md index c1f7e54c939a..da3ae4507a62 100644 --- a/docs/content/ingestion/update-existing-data.md +++ b/docs/content/ingestion/update-existing-data.md @@ -76,7 +76,7 @@ For example #### `multi` -This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. Please note that you can have only one `dataSource` as child of `multi` inputSpec. +This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. You can also use a `multi` inputSpec to combine data from multiple dataSources. However, each particular dataSource can only be specified one time. Note that, "useNewAggs" must be set to default value false to support delta-ingestion. |Field|Type|Description|Required| diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java index 437e3073e60e..0c28ced24e64 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -21,14 +21,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.collect.Lists; import io.druid.data.input.InputRow; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.JobHelper; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,82 +56,74 @@ public class DatasourceInputFormat extends InputFormat { private static final Logger logger = new Logger(DatasourceInputFormat.class); - public static final String CONF_INPUT_SEGMENTS = "druid.segments"; - public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; - public static final String CONF_TRANSFORM_SPEC = "druid.datasource.transformSpec"; - public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size"; + private static final String CONF_DATASOURCES = "druid.datasource.input.datasources"; + private static final String CONF_SCHEMA = "druid.datasource.input.schema"; + private static final String CONF_SEGMENTS = "druid.datasource.input.segments"; + private static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.input.split.max.size"; @Override public List getSplits(JobContext context) throws IOException { JobConf conf = new JobConf(context.getConfiguration()); - String segmentsStr = Preconditions.checkNotNull( - conf.get(CONF_INPUT_SEGMENTS), - "No segments found to read" - ); - List segments = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( - segmentsStr, - new TypeReference>() - { - } - ); - if (segments == null || segments.size() == 0) { - throw new ISE("No segments found to read"); - } + List dataSources = getDataSources(conf); + List splits = new ArrayList<>(); - // Note: log is splitted into two lines so that a new String is not generated to print it. - // segmentsStr could be quite large when re-indexing multiple months of data. - logger.info("Segment to read are..."); - logger.info(segmentsStr); + for (String dataSource : dataSources) { + List segments = getSegments(conf, dataSource); + if (segments == null || segments.size() == 0) { + throw new ISE("No segments found to read for dataSource[%s]", dataSource); + } - long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); - if (maxSize < 0) { - long totalSize = 0; - for (WindowedDataSegment segment : segments) { - totalSize += segment.getSegment().getSize(); + // Note: Each segment is logged separately to avoid creating a huge String if we are loading lots of segments. + for (int i = 0; i < segments.size(); i++) { + final WindowedDataSegment segment = segments.get(i); + logger.info( + "Segment %,d/%,d for dataSource[%s] has identifier[%s], interval[%s]", + i, + segments.size(), + dataSource, + segment.getSegment().getIdentifier(), + segment.getInterval() + ); } - int mapTask = conf.getNumMapTasks(); - if (mapTask > 0) { - maxSize = totalSize / mapTask; + + long maxSize = getMaxSplitSize(conf, dataSource); + if (maxSize < 0) { + long totalSize = 0; + for (WindowedDataSegment segment : segments) { + totalSize += segment.getSegment().getSize(); + } + int mapTask = conf.getNumMapTasks(); + if (mapTask > 0) { + maxSize = totalSize / mapTask; + } } - } - if (maxSize > 0) { - //combining is to happen, let us sort the segments list by size so that they - //are combined appropriately - Collections.sort( - segments, - new Comparator() - { - @Override - public int compare(WindowedDataSegment s1, WindowedDataSegment s2) - { - return Long.compare(s1.getSegment().getSize(), s2.getSegment().getSize()); - } - } - ); - } + if (maxSize > 0) { + //combining is to happen, let us sort the segments list by size so that they + //are combined appropriately + segments.sort(Comparator.comparingLong(s -> s.getSegment().getSize())); + } + + List list = new ArrayList<>(); + long size = 0; - List splits = Lists.newArrayList(); + org.apache.hadoop.mapred.InputFormat fio = supplier.get(); + for (WindowedDataSegment segment : segments) { + if (size + segment.getSegment().getSize() > maxSize && size > 0) { + splits.add(toDataSourceSplit(list, fio, conf)); + list = new ArrayList<>(); + size = 0; + } - List list = new ArrayList<>(); - long size = 0; + list.add(segment); + size += segment.getSegment().getSize(); + } - org.apache.hadoop.mapred.InputFormat fio = supplier.get(); - for (WindowedDataSegment segment : segments) { - if (size + segment.getSegment().getSize() > maxSize && size > 0) { + if (list.size() > 0) { splits.add(toDataSourceSplit(list, fio, conf)); - list = Lists.newArrayList(); - size = 0; } - - list.add(segment); - size += segment.getSegment().getSize(); - } - - if (list.size() > 0) { - splits.add(toDataSourceSplit(list, fio, conf)); } logger.info("Number of splits [%d]", splits.size()); @@ -167,7 +159,7 @@ protected boolean isSplitable(FileSystem fs, Path file) protected FileStatus[] listStatus(JobConf job) throws IOException { // to avoid globbing which needs input path should be hadoop-compatible (':' is not acceptable in path, etc.) - List statusList = Lists.newArrayList(); + List statusList = new ArrayList<>(); for (Path path : FileInputFormat.getInputPaths(job)) { // load spec in segment points specifically zip file itself statusList.add(path.getFileSystem(job).getFileStatus(path)); @@ -250,4 +242,90 @@ static String[] getFrequentLocations(final Stream locations) .map(Map.Entry::getKey) .toArray(String[]::new); } + + public static List getDataSources(final Configuration conf) throws IOException + { + final String currentDatasources = conf.get(CONF_DATASOURCES); + + if (currentDatasources == null) { + return Collections.emptyList(); + } + + return HadoopDruidIndexerConfig.JSON_MAPPER.readValue( + currentDatasources, + new TypeReference>() {} + ); + } + + public static DatasourceIngestionSpec getIngestionSpec(final Configuration conf, final String dataSource) + throws IOException + { + final String specString = conf.get(StringUtils.format("%s.%s", CONF_SCHEMA, dataSource)); + if (specString == null) { + throw new NullPointerException(StringUtils.format("null spec for dataSource[%s]", dataSource)); + } + + final DatasourceIngestionSpec spec = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( + specString, + DatasourceIngestionSpec.class + ); + + if (spec.getDimensions() == null || spec.getDimensions().size() == 0) { + throw new ISE("load schema does not have dimensions"); + } + + if (spec.getMetrics() == null || spec.getMetrics().size() == 0) { + throw new ISE("load schema does not have metrics"); + } + + return spec; + } + + public static List getSegments(final Configuration conf, final String dataSource) + throws IOException + { + return HadoopDruidIndexerConfig.JSON_MAPPER.readValue( + conf.get(StringUtils.format("%s.%s", CONF_SEGMENTS, dataSource)), + new TypeReference>() {} + ); + } + + public static long getMaxSplitSize(final Configuration conf, final String dataSource) + { + return conf.getLong(StringUtils.format("%s.%s", CONF_MAX_SPLIT_SIZE, dataSource), 0L); + } + + public static void addDataSource( + final Configuration conf, + final DatasourceIngestionSpec spec, + final List segments, + final long maxSplitSize + ) throws IOException + { + final List dataSources = getDataSources(conf); + + if (dataSources.contains(spec.getDataSource())) { + throw new ISE("Oops, cannot load the same dataSource twice!"); + } + + final List newDataSources = new ArrayList<>(dataSources); + newDataSources.add(spec.getDataSource()); + + conf.set(CONF_DATASOURCES, HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(newDataSources)); + + conf.set( + StringUtils.format("%s.%s", CONF_SCHEMA, spec.getDataSource()), + HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(spec) + ); + + conf.set( + StringUtils.format("%s.%s", CONF_SEGMENTS, spec.getDataSource()), + HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(segments) + ); + + conf.set( + StringUtils.format("%s.%s", CONF_MAX_SPLIT_SIZE, spec.getDataSource()), + String.valueOf(maxSplitSize) + ); + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java index 74220adc6b65..12170be38dc0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -19,10 +19,9 @@ package io.druid.indexer.hadoop; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.io.Closeables; import com.google.common.io.Files; @@ -30,14 +29,12 @@ import io.druid.data.input.Row; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.JobHelper; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -63,11 +60,18 @@ public class DatasourceRecordReader extends RecordReader private int numRows; @Override - public void initialize(InputSplit split, final TaskAttemptContext context) + public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException { - spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.JSON_MAPPER); - List segments = ((DatasourceInputSplit) split).getSegments(); + String dataSource = Iterators.getOnlyElement( + segments.stream() + .map(s -> s.getSegment().getDataSource()) + .distinct() + .iterator() + ); + + spec = DatasourceInputFormat.getIngestionSpec(context.getConfiguration(), dataSource); + logger.info("load schema [%s]", spec); List adapters = Lists.transform( segments, @@ -160,27 +164,4 @@ public void close() throws IOException FileUtils.deleteDirectory(dir); } } - - private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper) - { - try { - String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema"); - logger.info("load schema [%s]", schema); - - DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class); - - if (spec.getDimensions() == null || spec.getDimensions().size() == 0) { - throw new ISE("load schema does not have dimensions"); - } - - if (spec.getMetrics() == null || spec.getMetrics().size() == 0) { - throw new ISE("load schema does not have metrics"); - } - - return spec; - } - catch (IOException ex) { - throw new RuntimeException("couldn't load segment load spec", ex); - } - } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index d7cdf89cfbc9..7a9da80115eb 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -190,11 +190,8 @@ public Iterable apply(WindowedDataSegment dataSegment) config.getSchema().getDataSchema().getTransformSpec() ); - job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec)); - job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); - job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); + DatasourceInputFormat.addDataSource(job.getConfiguration(), updatedIngestionSpec, segments, maxSplitSize); MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class); - return job; } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index b7bb444c1cc7..3970e23fc1b7 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -119,7 +119,7 @@ public void testReindexing() throws Exception "ingestionSpec", ImmutableMap.of( "dataSource", - "xyz", + "testds", "interval", INTERVAL_FULL ), @@ -180,7 +180,7 @@ public void testReindexingWithNewAggregators() throws Exception "ingestionSpec", ImmutableMap.of( "dataSource", - "xyz", + "testds", "interval", INTERVAL_FULL ), @@ -239,7 +239,7 @@ public void testReindexingWithPartialWindow() throws Exception "ingestionSpec", ImmutableMap.of( "dataSource", - "xyz", + "testds", "interval", INTERVAL_FULL ), @@ -314,7 +314,7 @@ public void testDeltaIngestion() throws Exception "ingestionSpec", ImmutableMap.of( "dataSource", - "xyz", + "testds", "interval", INTERVAL_FULL ), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java index 1b4af099662f..1f9ab9b9a9b5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -23,14 +23,16 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Files; import io.druid.indexer.JobHelper; -import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.JodaUtils; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -51,8 +53,10 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -62,7 +66,8 @@ public class DatasourceInputFormatTest @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - private List segments; + private List segments1; + private List segments2; private List locations; private JobConf config; private JobContext context; @@ -70,7 +75,7 @@ public class DatasourceInputFormatTest @Before public void setUp() throws Exception { - segments = ImmutableList.of( + segments1 = ImmutableList.of( WindowedDataSegment.of( new DataSegment( "test1", @@ -89,7 +94,7 @@ public void setUp() throws Exception ), WindowedDataSegment.of( new DataSegment( - "test2", + "test1", Intervals.of("2050/3000"), "ver", ImmutableMap.of( @@ -105,7 +110,7 @@ public void setUp() throws Exception ), WindowedDataSegment.of( new DataSegment( - "test3", + "test1", Intervals.of("2030/3000"), "ver", ImmutableMap.of( @@ -121,9 +126,29 @@ public void setUp() throws Exception ) ); - Path path1 = new Path(JobHelper.getURIFromSegment(segments.get(0).getSegment())); - Path path2 = new Path(JobHelper.getURIFromSegment(segments.get(1).getSegment())); - Path path3 = new Path(JobHelper.getURIFromSegment(segments.get(2).getSegment())); + segments2 = ImmutableList.of( + WindowedDataSegment.of( + new DataSegment( + "test2", + Intervals.of("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index4.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + NoneShardSpec.instance(), + 9, + 2 + ) + ) + ); + + Path path1 = new Path(JobHelper.getURIFromSegment(segments1.get(0).getSegment())); + Path path2 = new Path(JobHelper.getURIFromSegment(segments1.get(1).getSegment())); + Path path3 = new Path(JobHelper.getURIFromSegment(segments1.get(2).getSegment())); + Path path4 = new Path(JobHelper.getURIFromSegment(segments2.get(0).getSegment())); // dummy locations for test locations = ImmutableList.of( @@ -140,7 +165,7 @@ public void setUp() throws Exception new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000), new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200), new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100), - new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700), + new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700) } ), new LocatedFileStatus( @@ -148,21 +173,56 @@ public void setUp() throws Exception new BlockLocation[]{ new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500) } + ), + new LocatedFileStatus( + 500, false, 0, 0, 0, 0, null, null, null, null, path4, + new BlockLocation[]{ + new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500) + } ) ); - config = new JobConf(); - config.set( - DatasourceInputFormat.CONF_INPUT_SEGMENTS, - new DefaultObjectMapper().writeValueAsString(segments) - ); - + config = populateConfiguration(new JobConf(), segments1, 0); context = EasyMock.createMock(JobContext.class); EasyMock.expect(context.getConfiguration()).andReturn(config); EasyMock.replay(context); } - private Supplier testFormatter = new Supplier() { + private static T populateConfiguration( + final T conf, + final List segments, + final long maxSplitSize + ) + throws IOException + { + DatasourceInputFormat.addDataSource( + conf, + new DatasourceIngestionSpec( + Iterators.getOnlyElement(segments.stream().map(s -> s.getSegment().getDataSource()).distinct().iterator()), + null, + Collections.singletonList( + JodaUtils.umbrellaInterval( + segments.stream() + .map(WindowedDataSegment::getInterval) + .collect(Collectors.toList()) + ) + ), + null, + null, + null, + null, + false, + null + ), + segments, + maxSplitSize + ); + + return conf; + } + + private Supplier testFormatter = new Supplier() + { @Override public InputFormat get() { @@ -202,25 +262,47 @@ public void testGetSplitsNoCombining() throws Exception DatasourceInputFormat inputFormat = new DatasourceInputFormat().setSupplier(testFormatter); List splits = inputFormat.getSplits(context); - Assert.assertEquals(segments.size(), splits.size()); - for (int i = 0; i < segments.size(); i++) { + Assert.assertEquals(segments1.size(), splits.size()); + for (int i = 0; i < segments1.size(); i++) { DatasourceInputSplit split = (DatasourceInputSplit) splits.get(i); - Assert.assertEquals(segments.get(i), split.getSegments().get(0)); + Assert.assertEquals(segments1.get(i), split.getSegments().get(0)); } - Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(0).getLocations()); - Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(1).getLocations()); - Assert.assertArrayEquals(new String[] {"s2", "s3"}, splits.get(2).getLocations()); + Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations()); + Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations()); + Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(2).getLocations()); + } + + @Test + public void testGetSplitsTwoDataSources() throws Exception + { + config.clear(); + populateConfiguration(populateConfiguration(config, segments1, 999999), segments2, 999999); + List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); + + Assert.assertEquals(2, splits.size()); + Assert.assertEquals( + Sets.newHashSet(segments1), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + Assert.assertEquals( + Sets.newHashSet(segments2), + Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) + ); + + Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations()); + Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations()); } @Test public void testGetSplitsAllCombined() throws Exception { - config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999"); + config.clear(); + populateConfiguration(config, segments1, 999999); List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); Assert.assertEquals(1, splits.size()); Assert.assertEquals( - Sets.newHashSet(segments), + Sets.newHashSet(segments1), Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) ); @@ -230,19 +312,20 @@ public void testGetSplitsAllCombined() throws Exception @Test public void testGetSplitsCombineInTwo() throws Exception { - config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6"); + config.clear(); + populateConfiguration(config, segments1, 6); List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); Assert.assertEquals(2, splits.size()); Assert.assertEquals( - Sets.newHashSet(segments.get(0), segments.get(2)), + Sets.newHashSet(segments1.get(0), segments1.get(2)), Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) ); Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations()); Assert.assertEquals( - Sets.newHashSet(segments.get(1)), + Sets.newHashSet(segments1.get(1)), Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) ); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations()); @@ -251,26 +334,27 @@ public void testGetSplitsCombineInTwo() throws Exception @Test public void testGetSplitsCombineCalculated() throws Exception { - config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "-1"); + config.clear(); + populateConfiguration(config, segments1, -1); config.setNumMapTasks(3); List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); Assert.assertEquals(3, splits.size()); Assert.assertEquals( - Sets.newHashSet(segments.get(0)), + Sets.newHashSet(segments1.get(0)), Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) ); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations()); Assert.assertEquals( - Sets.newHashSet(segments.get(2)), + Sets.newHashSet(segments1.get(2)), Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) ); Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations()); Assert.assertEquals( - Sets.newHashSet(segments.get(1)), + Sets.newHashSet(segments1.get(1)), Sets.newHashSet((((DatasourceInputSplit) splits.get(2)).getSegments())) ); Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(2).getLocations()); @@ -302,12 +386,7 @@ public void testGetSplitsUsingDefaultSupplier() throws Exception ) ); - final JobConf myConfig = new JobConf(); - myConfig.set( - DatasourceInputFormat.CONF_INPUT_SEGMENTS, - new DefaultObjectMapper().writeValueAsString(mySegments) - ); - + final JobConf myConfig = populateConfiguration(new JobConf(), mySegments, 0L); final JobContext myContext = EasyMock.createMock(JobContext.class); EasyMock.expect(myContext.getConfiguration()).andReturn(myConfig); EasyMock.replay(myContext); @@ -367,7 +446,7 @@ public void testGetLocationsInputFormatException() throws IOException Assert.assertEquals( 0, - DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count() + DatasourceInputFormat.getLocations(segments1.subList(0, 1), fio, config).count() ); } @@ -383,7 +462,7 @@ public void testGetLocationsSplitException() throws IOException ); EasyMock.expect(fio.getSplits(config, 1)).andReturn( - new org.apache.hadoop.mapred.InputSplit[] {split} + new org.apache.hadoop.mapred.InputSplit[]{split} ); EasyMock.expect(split.getLocations()).andThrow(new IOException("testing")); @@ -391,7 +470,7 @@ public void testGetLocationsSplitException() throws IOException Assert.assertEquals( 0, - DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count() + DatasourceInputFormat.getLocations(segments1.subList(0, 1), fio, config).count() ); } @@ -407,25 +486,25 @@ public void testGetLocations() throws IOException ); EasyMock.expect(fio.getSplits(config, 1)).andReturn( - new org.apache.hadoop.mapred.InputSplit[] {split} + new org.apache.hadoop.mapred.InputSplit[]{split} ); - EasyMock.expect(split.getLocations()).andReturn(new String[] {"s1", "s2"}); + EasyMock.expect(split.getLocations()).andReturn(new String[]{"s1", "s2"}); EasyMock.expect(fio.getSplits(config, 1)).andReturn( - new org.apache.hadoop.mapred.InputSplit[] {split} + new org.apache.hadoop.mapred.InputSplit[]{split} ); - EasyMock.expect(split.getLocations()).andReturn(new String[] {"s3"}); + EasyMock.expect(split.getLocations()).andReturn(new String[]{"s3"}); EasyMock.expect(fio.getSplits(config, 1)).andReturn( - new org.apache.hadoop.mapred.InputSplit[] {split} + new org.apache.hadoop.mapred.InputSplit[]{split} ); - EasyMock.expect(split.getLocations()).andReturn(new String[] {"s4", "s2"}); + EasyMock.expect(split.getLocations()).andReturn(new String[]{"s4", "s2"}); EasyMock.replay(fio, split); Assert.assertArrayEquals( - new String[] {"s1", "s2", "s3", "s4", "s2"}, - DatasourceInputFormat.getLocations(segments, fio, config).toArray(String[]::new) + new String[]{"s1", "s2", "s3", "s4", "s2"}, + DatasourceInputFormat.getLocations(segments1, fio, config).toArray(String[]::new) ); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java index 65947dac3d0a..561449171e17 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,21 +60,21 @@ public void testSanity() throws Exception InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)), null); Configuration config = new Configuration(); - config.set( - DatasourceInputFormat.CONF_DRUID_SCHEMA, - HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString( - new DatasourceIngestionSpec( - segment.getDataSource(), - segment.getInterval(), - null, - null, - null, - segment.getDimensions(), - segment.getMetrics(), - false, - null - ) - ) + DatasourceInputFormat.addDataSource( + config, + new DatasourceIngestionSpec( + segment.getDataSource(), + segment.getInterval(), + null, + null, + null, + segment.getDimensions(), + segment.getMetrics(), + false, + null + ), + Collections.emptyList(), + 0 ); TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index e98c676761e1..b3770fba92e2 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -19,7 +19,6 @@ package io.druid.indexer.path; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -60,6 +59,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -67,12 +67,14 @@ */ public class DatasourcePathSpecTest { - private DatasourceIngestionSpec ingestionSpec; - private List segments; + private DatasourceIngestionSpec ingestionSpec1; + private DatasourceIngestionSpec ingestionSpec2; + private List segments1; + private List segments2; public DatasourcePathSpecTest() { - this.ingestionSpec = new DatasourceIngestionSpec( + this.ingestionSpec1 = new DatasourceIngestionSpec( "test", Intervals.of("2000/3000"), null, @@ -84,10 +86,22 @@ public DatasourcePathSpecTest() null ); - segments = ImmutableList.of( + this.ingestionSpec2 = new DatasourceIngestionSpec( + "test2", + Intervals.of("2000/3000"), + null, + null, + null, + null, + null, + false, + null + ); + + segments1 = ImmutableList.of( WindowedDataSegment.of( new DataSegment( - ingestionSpec.getDataSource(), + ingestionSpec1.getDataSource(), Intervals.of("2000/3000"), "ver", ImmutableMap.of( @@ -103,7 +117,7 @@ public DatasourcePathSpecTest() ), WindowedDataSegment.of( new DataSegment( - ingestionSpec.getDataSource(), + ingestionSpec1.getDataSource(), Intervals.of("2050/3000"), "ver", ImmutableMap.of( @@ -118,6 +132,25 @@ public DatasourcePathSpecTest() ) ) ); + + segments2 = ImmutableList.of( + WindowedDataSegment.of( + new DataSegment( + ingestionSpec2.getDataSource(), + Intervals.of("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp2/index.zip" + ), + ImmutableList.of("product2"), + ImmutableList.of("visited_sum2", "unique_hosts2"), + NoneShardSpec.instance(), + 9, + 12334 + ) + ) + ); } @Test @@ -137,7 +170,9 @@ public void configure(Binder binder) { binder.bind(UsedSegmentLister.class).toInstance(segmentList); JsonConfigProvider.bindInstance( - binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null, null, true, false) + binder, + Key.get(DruidNode.class, Self.class), + new DruidNode("dummy-node", null, null, null, true, false) ); } } @@ -149,7 +184,7 @@ public void configure(Binder binder) DatasourcePathSpec expected = new DatasourcePathSpec( jsonMapper, null, - ingestionSpec, + ingestionSpec1, Long.valueOf(10), false ); @@ -159,7 +194,7 @@ public void configure(Binder binder) expected = new DatasourcePathSpec( jsonMapper, null, - ingestionSpec, + ingestionSpec1, null, false ); @@ -168,8 +203,8 @@ public void configure(Binder binder) expected = new DatasourcePathSpec( jsonMapper, - segments, - ingestionSpec, + segments1, + ingestionSpec1, null, false ); @@ -178,8 +213,8 @@ public void configure(Binder binder) expected = new DatasourcePathSpec( jsonMapper, - segments, - ingestionSpec, + segments1, + ingestionSpec1, null, true ); @@ -194,10 +229,18 @@ public void testAddInputPaths() throws Exception ObjectMapper mapper = TestHelper.makeJsonMapper(); - DatasourcePathSpec pathSpec = new DatasourcePathSpec( + DatasourcePathSpec pathSpec1 = new DatasourcePathSpec( mapper, - segments, - ingestionSpec, + segments1, + ingestionSpec1, + null, + false + ); + + DatasourcePathSpec pathSpec2 = new DatasourcePathSpec( + mapper, + segments2, + ingestionSpec2, null, false ); @@ -207,25 +250,29 @@ public void testAddInputPaths() throws Exception EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes(); EasyMock.replay(job); - pathSpec.addInputPaths(hadoopIndexerConfig, job); - List actualSegments = mapper.readValue( - config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS), - new TypeReference>() - { - } + pathSpec1.addInputPaths(hadoopIndexerConfig, job); + pathSpec2.addInputPaths(hadoopIndexerConfig, job); + + Assert.assertEquals( + ImmutableList.of(ingestionSpec1.getDataSource(), ingestionSpec2.getDataSource()), + DatasourceInputFormat.getDataSources(config) ); - Assert.assertEquals(segments, actualSegments); + Assert.assertEquals(segments1, DatasourceInputFormat.getSegments(config, ingestionSpec1.getDataSource())); + Assert.assertEquals(segments2, DatasourceInputFormat.getSegments(config, ingestionSpec2.getDataSource())); - DatasourceIngestionSpec actualIngestionSpec = mapper.readValue( - config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), - DatasourceIngestionSpec.class - ); Assert.assertEquals( - ingestionSpec + ingestionSpec1 .withDimensions(ImmutableList.of("product")) .withMetrics(ImmutableList.of("visited_sum")), - actualIngestionSpec + DatasourceInputFormat.getIngestionSpec(config, ingestionSpec1.getDataSource()) + ); + + Assert.assertEquals( + ingestionSpec2 + .withDimensions(ImmutableList.of("product2")) + .withMetrics(ImmutableList.of("visited_sum")), + DatasourceInputFormat.getIngestionSpec(config, ingestionSpec2.getDataSource()) ); } @@ -239,7 +286,7 @@ public void testAddInputPathsWithNoSegments() throws Exception DatasourcePathSpec pathSpec = new DatasourcePathSpec( mapper, null, - ingestionSpec, + ingestionSpec1, null, false ); @@ -261,22 +308,22 @@ public void testAddInputPathsWithNoSegments() throws Exception pathSpec = new DatasourcePathSpec( mapper, null, - ingestionSpec.withIgnoreWhenNoSegments(true), + ingestionSpec1.withIgnoreWhenNoSegments(true), null, false ); pathSpec.addInputPaths(hadoopIndexerConfig, job); - Assert.assertNull(config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS)); - Assert.assertNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA)); + Assert.assertEquals(Collections.emptyList(), DatasourceInputFormat.getDataSources(config)); } + @SuppressWarnings("unchecked") private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig() { return new HadoopDruidIndexerConfig( new HadoopIngestionSpec( new DataSchema( - ingestionSpec.getDataSource(), + ingestionSpec1.getDataSource(), HadoopDruidIndexerConfig.JSON_MAPPER.convertValue( new StringInputRowParser( new CSVParseSpec(