diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index 48283bdd78a2..a7b8cb6b0a81 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -20,6 +20,8 @@ package org.apache.druid.msq.indexing; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.client.ImmutableSegmentLoadInfo; @@ -41,12 +43,14 @@ import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -157,24 +161,25 @@ private Set getPrunedSegmentSet(final TableInputSpec ta { final TimelineLookup timeline = getTimeline(tableInputSpec.getDataSource(), tableInputSpec.getIntervals()); + final Predicate segmentFilter = tableInputSpec.getSegments() != null + ? Set.copyOf(tableInputSpec.getSegments())::contains + : Predicates.alwaysTrue(); if (timeline == null) { return Collections.emptySet(); } else { + // A segment can overlap with multiple search intervals, or even outside search intervals. + // The same segment can appear multiple times or 0 time, but each is also bounded within the overlapped search interval final Iterator dataSegmentIterator = tableInputSpec.getIntervals().stream() .flatMap(interval -> timeline.lookup(interval).stream()) .flatMap( holder -> StreamSupport.stream(holder.getObject().spliterator(), false) - .filter(chunk -> !chunk.getObject().isTombstone()) - .map( - chunk -> - new DataSegmentWithInterval( - chunk.getObject(), - holder.getInterval() - ) - ) + .map(PartitionChunk::getObject) + .filter(segment -> !segment.isTombstone()) + .filter(segment -> segmentFilter.apply(segment.toDescriptor())) + .map(segment -> new DataSegmentWithInterval(segment, holder.getInterval())) ).iterator(); return DimFilterUtils.filterShards( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java index b76479484220..697c65599a68 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.LoadableSegment; import org.apache.druid.msq.input.PhysicalInputSlice; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.DimFilter; import org.joda.time.Interval; @@ -44,6 +46,9 @@ public class TableInputSpec implements InputSpec private final String dataSource; private final List intervals; + @Nullable + private final List segments; + @Nullable private final DimFilter filter; @@ -58,6 +63,8 @@ public class TableInputSpec implements InputSpec * meaning that when this spec is sliced and read, the returned {@link LoadableSegment} * from {@link PhysicalInputSlice#getLoadableSegments()} are clipped to these intervals using * {@link LoadableSegment#descriptor()}. + * @param segments specific segments to read, or null to read all segments in the intervals. If provided, + * only these segments will be read. Must not be empty if non-null. * @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are * *not strict*, which means that processors must re-apply them when processing the returned * {@link LoadableSegment} from {@link PhysicalInputSlice#getLoadableSegments()}. This matches how @@ -69,16 +76,35 @@ public class TableInputSpec implements InputSpec public TableInputSpec( @JsonProperty("dataSource") String dataSource, @JsonProperty("intervals") @Nullable List intervals, + @JsonProperty("segments") @Nullable List segments, @JsonProperty("filter") @Nullable DimFilter filter, @JsonProperty("filterFields") @Nullable Set filterFields ) { this.dataSource = dataSource; this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals; + if (segments != null && segments.isEmpty()) { + throw new IAE("Can not supply empty segments as input, please use either null or non-empty segments."); + } + this.segments = segments; this.filter = filter; this.filterFields = filterFields; } + /** + * @deprecated Use {@link #TableInputSpec(String, List, List, DimFilter, Set)} with explicit null for segments instead. + */ + @Deprecated + public TableInputSpec( + String dataSource, + @Nullable List intervals, + @Nullable DimFilter filter, + @Nullable Set filterFields + ) + { + this(dataSource, intervals, null, filter, filterFields); + } + @JsonProperty public String getDataSource() { @@ -99,6 +125,14 @@ private List getIntervalsForSerialization() return intervals.equals(Intervals.ONLY_ETERNITY) ? null : intervals; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public List getSegments() + { + return segments; + } + @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @@ -127,6 +161,7 @@ public boolean equals(Object o) TableInputSpec that = (TableInputSpec) o; return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals) + && Objects.equals(segments, that.segments) && Objects.equals(filter, that.filter) && Objects.equals(filterFields, that.filterFields); } @@ -134,7 +169,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(dataSource, intervals, filter, filterFields); + return Objects.hash(dataSource, intervals, segments, filter, filterFields); } @Override @@ -143,6 +178,7 @@ public String toString() return "TableInputSpec{" + "dataSource='" + dataSource + '\'' + ", intervals=" + intervals + + (segments == null ? "" : ", segments=" + segments) + (filter == null ? "" : ", filter=" + filter) + (filterFields == null ? "" : ", filterFields=" + filterFields) + '}'; diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java index 0ac2097a89f4..a4eaf099dfe2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java @@ -139,7 +139,7 @@ private static InputSpec translateDataSource(DataSource dataSource) { if (dataSource instanceof TableDataSource) { TableDataSource ids = (TableDataSource) dataSource; - TableInputSpec inputSpec = new TableInputSpec(ids.getName(), Intervals.ONLY_ETERNITY, null, null); + TableInputSpec inputSpec = new TableInputSpec(ids.getName(), Intervals.ONLY_ETERNITY, null, null, null); return inputSpec; } if (dataSource instanceof InlineDataSource) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index a2743cf9acb7..51850d7046ec 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -53,6 +53,7 @@ import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.RestrictedDataSource; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; @@ -61,7 +62,9 @@ import org.apache.druid.query.planning.JoinDataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinConditionAnalysis; @@ -121,6 +124,11 @@ public class DataSourcePlan } } + DataSourcePlan withDataSource(DataSource newDataSource) + { + return new DataSourcePlan(newDataSource, inputSpecs, broadcastInputs, subQueryDefBuilder); + } + /** * Build a plan. * @@ -161,7 +169,7 @@ public static DataSourcePlan forDataSource( if (dataSource instanceof TableDataSource) { return forTable( (TableDataSource) dataSource, - querySegmentSpecIntervals(querySegmentSpec), + querySegmentSpec, filter, filterFields, broadcast @@ -169,7 +177,7 @@ public static DataSourcePlan forDataSource( } else if (dataSource instanceof RestrictedDataSource) { return forRestricted( (RestrictedDataSource) dataSource, - querySegmentSpecIntervals(querySegmentSpec), + querySegmentSpec, filter, filterFields, broadcast @@ -360,15 +368,24 @@ public boolean isSingleWorker() private static DataSourcePlan forTable( final TableDataSource dataSource, - final List intervals, + final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, final boolean broadcast ) { + final List segments; + if (querySegmentSpec instanceof MultipleSpecificSegmentSpec) { + segments = ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors(); + } else if (querySegmentSpec instanceof SpecificSegmentSpec) { + segments = List.of(((SpecificSegmentSpec) querySegmentSpec).getDescriptor()); + } else { + segments = null; + } + List intervals = querySegmentSpec.getIntervals(); return new DataSourcePlan( (broadcast && dataSource.isGlobal()) ? dataSource : new InputNumberDataSource(0), - Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter, filterFields)), + List.of(new TableInputSpec(dataSource.getName(), intervals, segments, filter, filterFields)), broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), null ); @@ -376,20 +393,16 @@ private static DataSourcePlan forTable( private static DataSourcePlan forRestricted( final RestrictedDataSource dataSource, - final List intervals, + final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, final boolean broadcast ) { - return new DataSourcePlan( - (broadcast && dataSource.isGlobal()) - ? dataSource - : new RestrictedInputNumberDataSource(0, dataSource.getPolicy()), - Collections.singletonList(new TableInputSpec(dataSource.getBase().getName(), intervals, filter, filterFields)), - broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), - null - ); + DataSource restricted = (broadcast && dataSource.isGlobal()) + ? dataSource + : new RestrictedInputNumberDataSource(0, dataSource.getPolicy()); + return forTable(dataSource.getBase(), querySegmentSpec, filter, filterFields, broadcast).withDataSource(restricted); } private static DataSourcePlan forExternal( @@ -791,7 +804,7 @@ private static List querySegmentSpecIntervals(final QuerySegmentSpec q /** * Verify that the provided {@link QuerySegmentSpec} is a {@link MultipleIntervalSegmentSpec} with * interval {@link Intervals#ETERNITY}. If not, throw an {@link UnsupportedOperationException}. - * + *

* We don't need to support this for anything that is not {@link DataSourceAnalysis#isTableBased()}, because * the SQL layer avoids "intervals" in other cases. See * {@link org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}. diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java index e64c94a883f4..b25416d79943 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java @@ -243,7 +243,7 @@ public void test_sliceDynamic() { // This slicer cannot sliceDynamic. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); Assertions.assertFalse(slicer.canSliceDynamic(inputSpec)); Assertions.assertThrows( UnsupportedOperationException.class, @@ -257,7 +257,7 @@ public void test_sliceStatic_wholeTable_oneSlice() // When 1 slice is requested, all segments are assigned to one server, even if that server doesn't actually // currently serve those segments. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 1); Assertions.assertEquals( ImmutableList.of( @@ -313,7 +313,7 @@ public void test_sliceStatic_wholeTable_twoSlices() { // When 2 slices are requested, we assign segments to the servers that have those segments. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 2); Assertions.assertEquals( ImmutableList.of( @@ -375,7 +375,7 @@ public void test_sliceStatic_wholeTable_threeSlices() { // When 3 slices are requested, only 2 are returned, because we only have two workers. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 3); Assertions.assertEquals( ImmutableList.of( @@ -436,7 +436,7 @@ public void test_sliceStatic_wholeTable_threeSlices() @Test public void test_sliceStatic_nonexistentTable() { - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 1); Assertions.assertEquals( Collections.emptyList(), @@ -452,6 +452,7 @@ public void test_sliceStatic_dimensionFilter_twoSlices() final TableInputSpec inputSpec = new TableInputSpec( DATASOURCE, null, + null, new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null), null ); @@ -515,6 +516,7 @@ public void test_sliceStatic_timeFilter_twoSlices() DATASOURCE, Collections.singletonList(Intervals.of("2000/P1Y")), null, + null, null ); @@ -559,7 +561,7 @@ void test_withoutRealtime_twoSlices() // When 2 slices are requested, we assign segments to the servers that have those segments. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 2); // Expect segment 2 and then the realtime segments 5 and 6 to be assigned round-robin. Assertions.assertEquals( diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java index 221e315c7d67..1b7510fd136c 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java @@ -63,7 +63,7 @@ public void test_getHasLeafInputs_broadcastTable() { Assert.assertFalse( InputSpecs.hasLeafInputs( - ImmutableList.of(new TableInputSpec("tbl", null, null, null)), + ImmutableList.of(new TableInputSpec("tbl", null, null, null, null)), IntSet.of(0) ) ); @@ -75,7 +75,7 @@ public void test_getHasLeafInputs_oneTableOneStage() Assert.assertTrue( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), + new TableInputSpec("tbl", null, null, null, null), new StageInputSpec(0) ), IntSets.emptySet() @@ -89,7 +89,7 @@ public void test_getHasLeafInputs_oneTableOneBroadcastStage() Assert.assertTrue( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), + new TableInputSpec("tbl", null, null, null, null), new StageInputSpec(0) ), IntSet.of(1) @@ -103,7 +103,7 @@ public void test_getHasLeafInputs_oneBroadcastTableOneStage() Assert.assertFalse( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), + new TableInputSpec("tbl", null, null, null, null), new StageInputSpec(0) ), IntSet.of(0) @@ -117,8 +117,8 @@ public void test_getHasLeafInputs_oneTableOneBroadcastTable() Assert.assertTrue( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), - new TableInputSpec("tbl2", null, null, null) + new TableInputSpec("tbl", null, null, null, null), + new TableInputSpec("tbl2", null, null, null, null) ), IntSet.of(1) ) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index ac864419abed..28be6956a13a 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; @@ -41,6 +42,7 @@ import org.junit.Test; import java.util.Collections; +import java.util.List; public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest { @@ -142,13 +144,13 @@ public RetType submit(TaskAction taskAction) @Test public void test_canSliceDynamic() { - Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, null, null, null))); + Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, null, null, null, null))); } @Test public void test_sliceStatic_noDataSource() { - final TableInputSpec spec = new TableInputSpec("no such datasource", null, null, null); + final TableInputSpec spec = new TableInputSpec("no such datasource", null, null, null, null); Assert.assertEquals( ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE), slicer.sliceStatic(spec, 2) @@ -165,6 +167,7 @@ public void test_sliceStatic_intervalFilter() Intervals.of("2000-06-01/P1M") ), null, + null, null ); @@ -212,6 +215,54 @@ public void test_sliceStatic_intervalFilterMatchesNothing() DATASOURCE, Collections.singletonList(Intervals.of("2002/P1M")), null, + null, + null + ); + + Assert.assertEquals( + ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE), + slicer.sliceStatic(spec, 2) + ); + } + + @Test + public void test_sliceStatic_segmentFilter() + { + final TableInputSpec spec = new TableInputSpec( + DATASOURCE, + null, + List.of(new SegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + )), + null, + null + ); + + RichSegmentDescriptor expectedSegment = new RichSegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + ); + Assert.assertEquals( + List.of(new SegmentsInputSlice(DATASOURCE, List.of(expectedSegment), List.of())), + slicer.sliceStatic(spec, 1)); + } + + @Test + public void test_sliceStatic_segmentAndIntervalFilter() + { + final TableInputSpec spec = new TableInputSpec( + DATASOURCE, + List.of(Intervals.of("2002/P1M")), + List.of(new SegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + )), + null, null ); @@ -227,6 +278,7 @@ public void test_sliceStatic_dimFilter() final TableInputSpec spec = new TableInputSpec( DATASOURCE, null, + null, new SelectorDimFilter("dim", "bar", null), null ); @@ -257,6 +309,7 @@ public void test_sliceStatic_dimFilterNotUsed() final TableInputSpec spec = new TableInputSpec( DATASOURCE, null, + null, new SelectorDimFilter("dim", "bar", null), Collections.emptySet() ); @@ -295,6 +348,7 @@ public void test_sliceStatic_intervalAndDimFilter() Intervals.of("2000/P1M"), Intervals.of("2000-06-01/P1M") ), + null, new SelectorDimFilter("dim", "bar", null), null ); @@ -333,7 +387,7 @@ public void test_sliceStatic_intervalAndDimFilter() @Test public void test_sliceStatic_oneSlice() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null, null); Assert.assertEquals( Collections.singletonList( new SegmentsInputSlice( @@ -362,7 +416,7 @@ public void test_sliceStatic_oneSlice() @Test public void test_sliceStatic_needTwoSlices() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null, null); Assert.assertEquals( ImmutableList.of( new SegmentsInputSlice( @@ -397,7 +451,7 @@ public void test_sliceStatic_needTwoSlices() @Test public void test_sliceStatic_threeSlices() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null, null); Assert.assertEquals( ImmutableList.of( new SegmentsInputSlice( diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java index 2313063ac360..452003311db9 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.TestHelper; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -43,6 +44,7 @@ public void testSerde() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Collections.singletonList(Intervals.of("2000/P1M")), + null, new SelectorDimFilter("dim", "val", null), Collections.singleton("dim") ); @@ -62,6 +64,7 @@ public void testSerdeEmptyFilterFields() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Collections.singletonList(Intervals.of("2000/P1M")), + null, new SelectorDimFilter("dim", "val", null), Collections.emptySet() ); @@ -81,6 +84,7 @@ public void testSerdeEternityInterval() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Intervals.ONLY_ETERNITY, + null, new SelectorDimFilter("dim", "val", null), null ); @@ -91,6 +95,26 @@ public void testSerdeEternityInterval() throws Exception ); } + @Test + public void testSerdeWithSegments() throws Exception + { + final ObjectMapper mapper = TestHelper.makeJsonMapper() + .registerModules(new MSQIndexingModule().getJacksonModules()); + + final TableInputSpec spec = new TableInputSpec( + "myds", + Collections.singletonList(Intervals.of("2000/P1M")), + Collections.singletonList(new SegmentDescriptor(Intervals.of("2000/P1M"), "version", 0)), + new SelectorDimFilter("dim", "val", null), + Collections.singleton("dim") + ); + + Assert.assertEquals( + spec, + mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class) + ); + } + @Test public void testEquals() {