diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index aea58796b654..b227d6fe4df0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -37,6 +37,7 @@ import org.joda.time.Interval; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -151,20 +152,19 @@ public static HadoopIngestionSpec updateSegmentListIfDatasourcePathSpecIsUsed( String ingestionSpec = "ingestionSpec"; Map pathSpec = spec.getIOConfig().getPathSpec(); - Map datasourcePathSpec = null; + List> datasourcePathSpecs = new ArrayList<>(); if (pathSpec.get(type).equals(dataSource)) { - datasourcePathSpec = pathSpec; + datasourcePathSpecs.add(pathSpec); } else if (pathSpec.get(type).equals(multi)) { List> childPathSpecs = (List>) pathSpec.get(children); for (Map childPathSpec : childPathSpecs) { if (childPathSpec.get(type).equals(dataSource)) { - datasourcePathSpec = childPathSpec; - break; + datasourcePathSpecs.add(childPathSpec); } } } - if (datasourcePathSpec != null) { + for (Map datasourcePathSpec : datasourcePathSpecs) { Map ingestionSpecMap = (Map) datasourcePathSpec.get(ingestionSpec); DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue( ingestionSpecMap, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index 50f51752cbec..0351a114e9c6 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -51,9 +51,12 @@ */ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest { - private final String testDatasource = "test"; - private final Interval testDatasourceInterval = Intervals.of("1970/3000"); - private final Interval testDatasourceIntervalPartial = Intervals.of("2050/3000"); + private static final String testDatasource = "test"; + private static final String testDatasource2 = "test2"; + private static final Interval testDatasourceInterval = Intervals.of("1970/3000"); + private static final Interval testDatasourceInterval2 = Intervals.of("2000/2001"); + private static final Interval testDatasourceIntervalPartial = Intervals.of("2050/3000"); + private final ObjectMapper jsonMapper; public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest() @@ -67,7 +70,7 @@ public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest() } private static final DataSegment SEGMENT = new DataSegment( - "test1", + testDatasource, Intervals.of("2000/3000"), "ver", ImmutableMap.of( @@ -81,6 +84,21 @@ public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest() 2 ); + private static final DataSegment SEGMENT2 = new DataSegment( + testDatasource2, + Intervals.of("2000/3000"), + "ver2", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index2.zip" + ), + ImmutableList.of("host2"), + ImmutableList.of("visited_sum", "unique_hosts"), + NoneShardSpec.instance(), + 9, + 2 + ); + @Test public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception { @@ -213,6 +231,22 @@ public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec( null ), null + ), + new DatasourcePathSpec( + jsonMapper, + null, + new DatasourceIngestionSpec( + testDatasource2, + testDatasourceInterval2, + null, + null, + null, + null, + null, + false, + null + ), + null ) ) ); @@ -224,6 +258,10 @@ public void testUpdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec( ImmutableList.of(WindowedDataSegment.of(SEGMENT)), ((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments() ); + Assert.assertEquals( + ImmutableList.of(new WindowedDataSegment(SEGMENT2, testDatasourceInterval2)), + ((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(2)).getSegments() + ); } private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( @@ -259,9 +297,21 @@ private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsU ); UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class); + EasyMock.expect( - segmentLister.getUsedSegmentsForIntervals(testDatasource, Lists.newArrayList(jobInterval)) + segmentLister.getUsedSegmentsForIntervals( + testDatasource, + Lists.newArrayList(jobInterval != null ? jobInterval.overlap(testDatasourceInterval) : null) + ) ).andReturn(ImmutableList.of(SEGMENT)); + + EasyMock.expect( + segmentLister.getUsedSegmentsForIntervals( + testDatasource2, + Lists.newArrayList(jobInterval != null ? jobInterval.overlap(testDatasourceInterval2) : null) + ) + ).andReturn(ImmutableList.of(SEGMENT2)); + EasyMock.replay(segmentLister); spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentLister);