From f13c9534a7a6a5333302b2bbdea1f58eeb29cf1a Mon Sep 17 00:00:00 2001 From: cecemei Date: Thu, 15 Jan 2026 18:39:44 -0800 Subject: [PATCH 1/4] input --- .../indexing/IndexerTableInputSpecSlicer.java | 85 +++++++++++-------- .../druid/msq/input/table/TableInputSpec.java | 35 +++++++- .../druid/msq/querykit/DataSourcePlan.java | 38 ++++++--- .../msq/input/table/TableInputSpecTest.java | 21 +++++ 4 files changed, 128 insertions(+), 51 deletions(-) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index 48283bdd78a2..b87952d18d06 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -22,8 +22,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.logger.Logger; @@ -41,18 +43,19 @@ import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -155,26 +158,21 @@ public List sliceDynamic( private Set getPrunedSegmentSet(final TableInputSpec tableInputSpec) { - final TimelineLookup timeline = - getTimeline(tableInputSpec.getDataSource(), tableInputSpec.getIntervals()); + final TimelineLookup timeline = getTimeline(tableInputSpec); if (timeline == null) { return Collections.emptySet(); } else { + // A segment can overlap with multiple search intervals, thus the same segment can appear multiple times, but each is also bounded within the overlap search interval final Iterator dataSegmentIterator = tableInputSpec.getIntervals().stream() .flatMap(interval -> timeline.lookup(interval).stream()) .flatMap( holder -> StreamSupport.stream(holder.getObject().spliterator(), false) - .filter(chunk -> !chunk.getObject().isTombstone()) - .map( - chunk -> - new DataSegmentWithInterval( - chunk.getObject(), - holder.getInterval() - ) - ) + .map(PartitionChunk::getObject) + .filter(segment -> !segment.isTombstone()) + .map(segment -> new DataSegmentWithInterval(segment, holder.getInterval())) ).iterator(); return DimFilterUtils.filterShards( @@ -188,65 +186,80 @@ private Set getPrunedSegmentSet(final TableInputSpec ta } @Nullable - private VersionedIntervalTimeline getTimeline( - final String dataSource, - final List intervals - ) + private VersionedIntervalTimeline getTimeline(TableInputSpec inputSpec) { + String dataSource = inputSpec.getDataSource(); + List intervals = inputSpec.getIntervals(); + Set segments = inputSpec.getSegments() == null ? null : Set.copyOf(inputSpec.getSegments()); + final boolean includeRealtime = SegmentSource.shouldQueryRealtimeServers(includeSegmentSource); - final Iterable realtimeAndHistoricalSegments; + final Iterable realtimeSegments; - // Fetch the realtime segments and segments loaded on the historical. Do this first so that we don't miss any - // segment if they get handed off between the two calls. Segments loaded on historicals are deduplicated below, + // Fetch the realtime segments. Do this first so that we don't miss any segment if they get handed off between the two calls. + // Segments loaded on historicals are deduplicated below, // since we are only interested in realtime segments for now. if (includeRealtime) { - realtimeAndHistoricalSegments = coordinatorClient.fetchServerViewSegments(dataSource, intervals); + realtimeSegments = coordinatorClient.fetchServerViewSegments(dataSource, intervals); } else { - realtimeAndHistoricalSegments = ImmutableList.of(); + realtimeSegments = ImmutableList.of(); } // Fetch all published, used segments (all non-realtime segments) from the metadata store. // If the task is operating with a REPLACE lock, // any segment created after the lock was acquired for its interval will not be considered. - final Collection publishedUsedSegments; + final Set publishedUsedSegments; try { // Additional check as the task action does not accept empty intervals if (intervals.isEmpty()) { publishedUsedSegments = Collections.emptySet(); } else { - publishedUsedSegments = - taskActionClient.submit(new RetrieveUsedSegmentsAction(dataSource, intervals)); + publishedUsedSegments = taskActionClient.submit(new RetrieveUsedSegmentsAction(dataSource, intervals)) + .stream() + .filter(s -> segments == null || segments.contains(s.toDescriptor())) + .collect(Collectors.toSet()); } } catch (IOException e) { throw new MSQException(e, UnknownFault.forException(e)); } - int realtimeCount = 0; + if (segments != null && segments.size() != publishedUsedSegments.size()) { + Set missingSegments = + Sets.difference( + segments, + publishedUsedSegments.stream().map(DataSegment::toDescriptor).collect(Collectors.toSet()) + ); + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Missing [%d]segments, it could be outdated: %s", + missingSegments.size(), + missingSegments + ); + } - // Deduplicate segments, giving preference to published used segments. - // We do this so that if any segments have been handed off in between the two metadata calls above, - // we directly fetch it from deep storage. + // Giving preference to published used segments first, Set.add doesn't replace segment if already exist. + // If any segments have been handed off in between the two metadata calls above, we directly fetch it from deep storage. Set unifiedSegmentView = new HashSet<>(publishedUsedSegments); + int realtimeCount = 0; // Iterate over the realtime segments and segments loaded on the historical - for (ImmutableSegmentLoadInfo segmentLoadInfo : realtimeAndHistoricalSegments) { + for (ImmutableSegmentLoadInfo segmentLoadInfo : realtimeSegments) { Set servers = segmentLoadInfo.getServers(); // Filter out only realtime servers. We don't want to query historicals for now, but we can in the future. // This check can be modified then. - Set realtimeServerMetadata - = servers.stream() - .filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes() - .contains(druidServerMetadata.getType()) - ) - .collect(Collectors.toSet()); + Set realtimeServerMetadata = + servers.stream() + .filter(serverMetadata -> includeSegmentSource.getUsedServerTypes().contains(serverMetadata.getType())) + .collect(Collectors.toSet()); if (!realtimeServerMetadata.isEmpty()) { - realtimeCount += 1; DataSegmentWithLocation dataSegmentWithLocation = new DataSegmentWithLocation( segmentLoadInfo.getSegment(), realtimeServerMetadata ); - unifiedSegmentView.add(dataSegmentWithLocation); + if (unifiedSegmentView.add(dataSegmentWithLocation)) { + realtimeCount += 1; + } } else { // We don't have any segments of the required segment source, ignore the segment } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java index b76479484220..fdfd8bece4dd 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.LoadableSegment; import org.apache.druid.msq.input.PhysicalInputSlice; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.DimFilter; import org.joda.time.Interval; @@ -44,6 +46,9 @@ public class TableInputSpec implements InputSpec private final String dataSource; private final List intervals; + @Nullable + private final List segments; + @Nullable private final DimFilter filter; @@ -58,6 +63,8 @@ public class TableInputSpec implements InputSpec * meaning that when this spec is sliced and read, the returned {@link LoadableSegment} * from {@link PhysicalInputSlice#getLoadableSegments()} are clipped to these intervals using * {@link LoadableSegment#descriptor()}. + * @param segments specific segments to read, or null to read all segments in the intervals. If provided, + * only these segments will be read. Must not be empty if non-null. * @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are * *not strict*, which means that processors must re-apply them when processing the returned * {@link LoadableSegment} from {@link PhysicalInputSlice#getLoadableSegments()}. This matches how @@ -69,16 +76,32 @@ public class TableInputSpec implements InputSpec public TableInputSpec( @JsonProperty("dataSource") String dataSource, @JsonProperty("intervals") @Nullable List intervals, + @JsonProperty("segments") @Nullable List segments, @JsonProperty("filter") @Nullable DimFilter filter, @JsonProperty("filterFields") @Nullable Set filterFields ) { this.dataSource = dataSource; this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals; + if (segments != null && segments.isEmpty()) { + throw DruidException.defensive( + "Can not supply empty segments as input, please use either null or non-empty segments."); + } + this.segments = segments; this.filter = filter; this.filterFields = filterFields; } + public TableInputSpec( + String dataSource, + @Nullable List intervals, + @Nullable DimFilter filter, + @Nullable Set filterFields + ) + { + this(dataSource, intervals, null, filter, filterFields); + } + @JsonProperty public String getDataSource() { @@ -99,6 +122,14 @@ private List getIntervalsForSerialization() return intervals.equals(Intervals.ONLY_ETERNITY) ? null : intervals; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public List getSegments() + { + return segments; + } + @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @@ -127,6 +158,7 @@ public boolean equals(Object o) TableInputSpec that = (TableInputSpec) o; return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals) + && Objects.equals(segments, that.segments) && Objects.equals(filter, that.filter) && Objects.equals(filterFields, that.filterFields); } @@ -134,7 +166,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(dataSource, intervals, filter, filterFields); + return Objects.hash(dataSource, intervals, segments, filter, filterFields); } @Override @@ -143,6 +175,7 @@ public String toString() return "TableInputSpec{" + "dataSource='" + dataSource + '\'' + ", intervals=" + intervals + + (segments == null ? "" : ", segments=" + segments) + (filter == null ? "" : ", filter=" + filter) + (filterFields == null ? "" : ", filterFields=" + filterFields) + '}'; diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index a2743cf9acb7..ced5fb893ebd 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -53,6 +53,7 @@ import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.RestrictedDataSource; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.UnnestDataSource; @@ -61,6 +62,7 @@ import org.apache.druid.query.planning.JoinDataSourceAnalysis; import org.apache.druid.query.planning.PreJoinableClause; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; @@ -121,6 +123,11 @@ public class DataSourcePlan } } + DataSourcePlan withDataSource(DataSource newDataSource) + { + return new DataSourcePlan(newDataSource, inputSpecs, broadcastInputs, subQueryDefBuilder); + } + /** * Build a plan. * @@ -161,7 +168,7 @@ public static DataSourcePlan forDataSource( if (dataSource instanceof TableDataSource) { return forTable( (TableDataSource) dataSource, - querySegmentSpecIntervals(querySegmentSpec), + querySegmentSpec, filter, filterFields, broadcast @@ -169,7 +176,7 @@ public static DataSourcePlan forDataSource( } else if (dataSource instanceof RestrictedDataSource) { return forRestricted( (RestrictedDataSource) dataSource, - querySegmentSpecIntervals(querySegmentSpec), + querySegmentSpec, filter, filterFields, broadcast @@ -360,15 +367,22 @@ public boolean isSingleWorker() private static DataSourcePlan forTable( final TableDataSource dataSource, - final List intervals, + final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, final boolean broadcast ) { + final List segments; + if (querySegmentSpec instanceof MultipleSpecificSegmentSpec) { + segments = ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors(); + } else { + segments = null; + } + List intervals = querySegmentSpec.getIntervals(); return new DataSourcePlan( (broadcast && dataSource.isGlobal()) ? dataSource : new InputNumberDataSource(0), - Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter, filterFields)), + List.of(new TableInputSpec(dataSource.getName(), intervals, segments, filter, filterFields)), broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), null ); @@ -376,20 +390,16 @@ private static DataSourcePlan forTable( private static DataSourcePlan forRestricted( final RestrictedDataSource dataSource, - final List intervals, + final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, final boolean broadcast ) { - return new DataSourcePlan( - (broadcast && dataSource.isGlobal()) - ? dataSource - : new RestrictedInputNumberDataSource(0, dataSource.getPolicy()), - Collections.singletonList(new TableInputSpec(dataSource.getBase().getName(), intervals, filter, filterFields)), - broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), - null - ); + DataSource restricted = (broadcast && dataSource.isGlobal()) + ? dataSource + : new RestrictedInputNumberDataSource(0, dataSource.getPolicy()); + return forTable(dataSource.getBase(), querySegmentSpec, filter, filterFields, broadcast).withDataSource(restricted); } private static DataSourcePlan forExternal( @@ -791,7 +801,7 @@ private static List querySegmentSpecIntervals(final QuerySegmentSpec q /** * Verify that the provided {@link QuerySegmentSpec} is a {@link MultipleIntervalSegmentSpec} with * interval {@link Intervals#ETERNITY}. If not, throw an {@link UnsupportedOperationException}. - * + *

* We don't need to support this for anything that is not {@link DataSourceAnalysis#isTableBased()}, because * the SQL layer avoids "intervals" in other cases. See * {@link org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}. diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java index 2313063ac360..f5fd13cc4a94 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.TestHelper; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -91,6 +92,26 @@ public void testSerdeEternityInterval() throws Exception ); } + @Test + public void testSerdeWithSegments() throws Exception + { + final ObjectMapper mapper = TestHelper.makeJsonMapper() + .registerModules(new MSQIndexingModule().getJacksonModules()); + + final TableInputSpec spec = new TableInputSpec( + "myds", + Collections.singletonList(Intervals.of("2000/P1M")), + Collections.singletonList(new SegmentDescriptor(Intervals.of("2000/P1M"), "version", 0)), + new SelectorDimFilter("dim", "val", null), + Collections.singleton("dim") + ); + + Assert.assertEquals( + spec, + mapper.readValue(mapper.writeValueAsString(spec), InputSpec.class) + ); + } + @Test public void testEquals() { From bbdeaaf6a17c69c5421073d9e270d1f5f02407d7 Mon Sep 17 00:00:00 2001 From: cecemei Date: Fri, 16 Jan 2026 12:27:41 -0800 Subject: [PATCH 2/4] format and deprecate --- .../indexing/IndexerTableInputSpecSlicer.java | 23 +++++++++++++++---- .../druid/msq/input/table/TableInputSpec.java | 4 ++++ .../druid/msq/logical/stages/ReadStage.java | 2 +- .../druid/msq/querykit/DataSourcePlan.java | 3 +++ .../DartTableInputSpecSlicerTest.java | 14 ++++++----- .../druid/msq/input/InputSpecsTest.java | 12 +++++----- .../IndexerTableInputSpecSlicerTest.java | 15 ++++++++---- .../msq/input/table/TableInputSpecTest.java | 3 +++ 8 files changed, 53 insertions(+), 23 deletions(-) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index b87952d18d06..0078781be96c 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -163,7 +163,8 @@ private Set getPrunedSegmentSet(final TableInputSpec ta if (timeline == null) { return Collections.emptySet(); } else { - // A segment can overlap with multiple search intervals, thus the same segment can appear multiple times, but each is also bounded within the overlap search interval + // A segment can overlap with multiple search intervals, or even outside search intervals. + // The same segment can appear multiple times or 0 time, but each is also bounded within the overlapped search interval final Iterator dataSegmentIterator = tableInputSpec.getIntervals().stream() .flatMap(interval -> timeline.lookup(interval).stream()) @@ -185,6 +186,21 @@ private Set getPrunedSegmentSet(final TableInputSpec ta } } + /** + * Builds a timeline of segments for the given {@link TableInputSpec} by combining segments from both + * realtime servers and the metadata store. + *

+ * Realtime segments are fetched first to avoid missing segments that may be handed off between the two calls. + * When a segment appears in both sources, the published version is used. + *

+ * If the task is operating with a REPLACE lock, only segments that existed before the lock was acquired + * will be considered. + * + * @param inputSpec the table input specification containing datasource, intervals, and optional specific segments + * @return a timeline containing all matching segments, or null if no segments are found + * @throws MSQException if there's an IO error fetching segments from the metadata store + * @throws DruidException if specific segments were requested but some are missing or outdated + */ @Nullable private VersionedIntervalTimeline getTimeline(TableInputSpec inputSpec) { @@ -194,10 +210,7 @@ private VersionedIntervalTimeline getTimeline(TableInputSpe final boolean includeRealtime = SegmentSource.shouldQueryRealtimeServers(includeSegmentSource); final Iterable realtimeSegments; - // Fetch the realtime segments. Do this first so that we don't miss any segment if they get handed off between the two calls. - // Segments loaded on historicals are deduplicated below, - // since we are only interested in realtime segments for now. if (includeRealtime) { realtimeSegments = coordinatorClient.fetchServerViewSegments(dataSource, intervals); } else { @@ -243,7 +256,7 @@ private VersionedIntervalTimeline getTimeline(TableInputSpe Set unifiedSegmentView = new HashSet<>(publishedUsedSegments); int realtimeCount = 0; - // Iterate over the realtime segments and segments loaded on the historical + // Iterate over the realtime segments to attach server metadata for (ImmutableSegmentLoadInfo segmentLoadInfo : realtimeSegments) { Set servers = segmentLoadInfo.getServers(); // Filter out only realtime servers. We don't want to query historicals for now, but we can in the future. diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java index fdfd8bece4dd..d973c290ca54 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java @@ -92,6 +92,10 @@ public TableInputSpec( this.filterFields = filterFields; } + /** + * @deprecated Use {@link #TableInputSpec(String, List, List, DimFilter, Set)} with explicit null for segments instead. + */ + @Deprecated public TableInputSpec( String dataSource, @Nullable List intervals, diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java index 0ac2097a89f4..a4eaf099dfe2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java @@ -139,7 +139,7 @@ private static InputSpec translateDataSource(DataSource dataSource) { if (dataSource instanceof TableDataSource) { TableDataSource ids = (TableDataSource) dataSource; - TableInputSpec inputSpec = new TableInputSpec(ids.getName(), Intervals.ONLY_ETERNITY, null, null); + TableInputSpec inputSpec = new TableInputSpec(ids.getName(), Intervals.ONLY_ETERNITY, null, null, null); return inputSpec; } if (dataSource instanceof InlineDataSource) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index ced5fb893ebd..51850d7046ec 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -64,6 +64,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinConditionAnalysis; @@ -376,6 +377,8 @@ private static DataSourcePlan forTable( final List segments; if (querySegmentSpec instanceof MultipleSpecificSegmentSpec) { segments = ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors(); + } else if (querySegmentSpec instanceof SpecificSegmentSpec) { + segments = List.of(((SpecificSegmentSpec) querySegmentSpec).getDescriptor()); } else { segments = null; } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java index e64c94a883f4..b25416d79943 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java @@ -243,7 +243,7 @@ public void test_sliceDynamic() { // This slicer cannot sliceDynamic. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); Assertions.assertFalse(slicer.canSliceDynamic(inputSpec)); Assertions.assertThrows( UnsupportedOperationException.class, @@ -257,7 +257,7 @@ public void test_sliceStatic_wholeTable_oneSlice() // When 1 slice is requested, all segments are assigned to one server, even if that server doesn't actually // currently serve those segments. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 1); Assertions.assertEquals( ImmutableList.of( @@ -313,7 +313,7 @@ public void test_sliceStatic_wholeTable_twoSlices() { // When 2 slices are requested, we assign segments to the servers that have those segments. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 2); Assertions.assertEquals( ImmutableList.of( @@ -375,7 +375,7 @@ public void test_sliceStatic_wholeTable_threeSlices() { // When 3 slices are requested, only 2 are returned, because we only have two workers. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 3); Assertions.assertEquals( ImmutableList.of( @@ -436,7 +436,7 @@ public void test_sliceStatic_wholeTable_threeSlices() @Test public void test_sliceStatic_nonexistentTable() { - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 1); Assertions.assertEquals( Collections.emptyList(), @@ -452,6 +452,7 @@ public void test_sliceStatic_dimensionFilter_twoSlices() final TableInputSpec inputSpec = new TableInputSpec( DATASOURCE, null, + null, new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null), null ); @@ -515,6 +516,7 @@ public void test_sliceStatic_timeFilter_twoSlices() DATASOURCE, Collections.singletonList(Intervals.of("2000/P1Y")), null, + null, null ); @@ -559,7 +561,7 @@ void test_withoutRealtime_twoSlices() // When 2 slices are requested, we assign segments to the servers that have those segments. - final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null); final List inputSlices = slicer.sliceStatic(inputSpec, 2); // Expect segment 2 and then the realtime segments 5 and 6 to be assigned round-robin. Assertions.assertEquals( diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java index 221e315c7d67..1b7510fd136c 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/InputSpecsTest.java @@ -63,7 +63,7 @@ public void test_getHasLeafInputs_broadcastTable() { Assert.assertFalse( InputSpecs.hasLeafInputs( - ImmutableList.of(new TableInputSpec("tbl", null, null, null)), + ImmutableList.of(new TableInputSpec("tbl", null, null, null, null)), IntSet.of(0) ) ); @@ -75,7 +75,7 @@ public void test_getHasLeafInputs_oneTableOneStage() Assert.assertTrue( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), + new TableInputSpec("tbl", null, null, null, null), new StageInputSpec(0) ), IntSets.emptySet() @@ -89,7 +89,7 @@ public void test_getHasLeafInputs_oneTableOneBroadcastStage() Assert.assertTrue( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), + new TableInputSpec("tbl", null, null, null, null), new StageInputSpec(0) ), IntSet.of(1) @@ -103,7 +103,7 @@ public void test_getHasLeafInputs_oneBroadcastTableOneStage() Assert.assertFalse( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), + new TableInputSpec("tbl", null, null, null, null), new StageInputSpec(0) ), IntSet.of(0) @@ -117,8 +117,8 @@ public void test_getHasLeafInputs_oneTableOneBroadcastTable() Assert.assertTrue( InputSpecs.hasLeafInputs( ImmutableList.of( - new TableInputSpec("tbl", null, null, null), - new TableInputSpec("tbl2", null, null, null) + new TableInputSpec("tbl", null, null, null, null), + new TableInputSpec("tbl2", null, null, null, null) ), IntSet.of(1) ) diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index ac864419abed..7393f0cf901a 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -142,13 +142,13 @@ public RetType submit(TaskAction taskAction) @Test public void test_canSliceDynamic() { - Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, null, null, null))); + Assert.assertTrue(slicer.canSliceDynamic(new TableInputSpec(DATASOURCE, null, null, null, null))); } @Test public void test_sliceStatic_noDataSource() { - final TableInputSpec spec = new TableInputSpec("no such datasource", null, null, null); + final TableInputSpec spec = new TableInputSpec("no such datasource", null, null, null, null); Assert.assertEquals( ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE), slicer.sliceStatic(spec, 2) @@ -165,6 +165,7 @@ public void test_sliceStatic_intervalFilter() Intervals.of("2000-06-01/P1M") ), null, + null, null ); @@ -212,6 +213,7 @@ public void test_sliceStatic_intervalFilterMatchesNothing() DATASOURCE, Collections.singletonList(Intervals.of("2002/P1M")), null, + null, null ); @@ -227,6 +229,7 @@ public void test_sliceStatic_dimFilter() final TableInputSpec spec = new TableInputSpec( DATASOURCE, null, + null, new SelectorDimFilter("dim", "bar", null), null ); @@ -257,6 +260,7 @@ public void test_sliceStatic_dimFilterNotUsed() final TableInputSpec spec = new TableInputSpec( DATASOURCE, null, + null, new SelectorDimFilter("dim", "bar", null), Collections.emptySet() ); @@ -295,6 +299,7 @@ public void test_sliceStatic_intervalAndDimFilter() Intervals.of("2000/P1M"), Intervals.of("2000-06-01/P1M") ), + null, new SelectorDimFilter("dim", "bar", null), null ); @@ -333,7 +338,7 @@ public void test_sliceStatic_intervalAndDimFilter() @Test public void test_sliceStatic_oneSlice() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null, null); Assert.assertEquals( Collections.singletonList( new SegmentsInputSlice( @@ -362,7 +367,7 @@ public void test_sliceStatic_oneSlice() @Test public void test_sliceStatic_needTwoSlices() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null, null); Assert.assertEquals( ImmutableList.of( new SegmentsInputSlice( @@ -397,7 +402,7 @@ public void test_sliceStatic_needTwoSlices() @Test public void test_sliceStatic_threeSlices() { - final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null); + final TableInputSpec spec = new TableInputSpec(DATASOURCE, null, null, null, null); Assert.assertEquals( ImmutableList.of( new SegmentsInputSlice( diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java index f5fd13cc4a94..452003311db9 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/TableInputSpecTest.java @@ -44,6 +44,7 @@ public void testSerde() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Collections.singletonList(Intervals.of("2000/P1M")), + null, new SelectorDimFilter("dim", "val", null), Collections.singleton("dim") ); @@ -63,6 +64,7 @@ public void testSerdeEmptyFilterFields() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Collections.singletonList(Intervals.of("2000/P1M")), + null, new SelectorDimFilter("dim", "val", null), Collections.emptySet() ); @@ -82,6 +84,7 @@ public void testSerdeEternityInterval() throws Exception final TableInputSpec spec = new TableInputSpec( "myds", Intervals.ONLY_ETERNITY, + null, new SelectorDimFilter("dim", "val", null), null ); From 7af94c0b40fd97fbdf1b7ef1736bf7ec3a1e6f09 Mon Sep 17 00:00:00 2001 From: cecemei Date: Mon, 26 Jan 2026 16:50:29 -0800 Subject: [PATCH 3/4] allow non-complete segments --- .../indexing/IndexerTableInputSpecSlicer.java | 95 ++++++++----------- 1 file changed, 37 insertions(+), 58 deletions(-) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java index 0078781be96c..a7b8cb6b0a81 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicer.java @@ -20,12 +20,12 @@ package org.apache.druid.msq.indexing; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; -import com.google.common.collect.Sets; import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.logger.Logger; @@ -56,6 +56,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -158,7 +159,11 @@ public List sliceDynamic( private Set getPrunedSegmentSet(final TableInputSpec tableInputSpec) { - final TimelineLookup timeline = getTimeline(tableInputSpec); + final TimelineLookup timeline = + getTimeline(tableInputSpec.getDataSource(), tableInputSpec.getIntervals()); + final Predicate segmentFilter = tableInputSpec.getSegments() != null + ? Set.copyOf(tableInputSpec.getSegments())::contains + : Predicates.alwaysTrue(); if (timeline == null) { return Collections.emptySet(); @@ -173,6 +178,7 @@ private Set getPrunedSegmentSet(final TableInputSpec ta StreamSupport.stream(holder.getObject().spliterator(), false) .map(PartitionChunk::getObject) .filter(segment -> !segment.isTombstone()) + .filter(segment -> segmentFilter.apply(segment.toDescriptor())) .map(segment -> new DataSegmentWithInterval(segment, holder.getInterval())) ).iterator(); @@ -186,93 +192,66 @@ private Set getPrunedSegmentSet(final TableInputSpec ta } } - /** - * Builds a timeline of segments for the given {@link TableInputSpec} by combining segments from both - * realtime servers and the metadata store. - *

- * Realtime segments are fetched first to avoid missing segments that may be handed off between the two calls. - * When a segment appears in both sources, the published version is used. - *

- * If the task is operating with a REPLACE lock, only segments that existed before the lock was acquired - * will be considered. - * - * @param inputSpec the table input specification containing datasource, intervals, and optional specific segments - * @return a timeline containing all matching segments, or null if no segments are found - * @throws MSQException if there's an IO error fetching segments from the metadata store - * @throws DruidException if specific segments were requested but some are missing or outdated - */ @Nullable - private VersionedIntervalTimeline getTimeline(TableInputSpec inputSpec) + private VersionedIntervalTimeline getTimeline( + final String dataSource, + final List intervals + ) { - String dataSource = inputSpec.getDataSource(); - List intervals = inputSpec.getIntervals(); - Set segments = inputSpec.getSegments() == null ? null : Set.copyOf(inputSpec.getSegments()); - final boolean includeRealtime = SegmentSource.shouldQueryRealtimeServers(includeSegmentSource); - final Iterable realtimeSegments; - // Fetch the realtime segments. Do this first so that we don't miss any segment if they get handed off between the two calls. + final Iterable realtimeAndHistoricalSegments; + + // Fetch the realtime segments and segments loaded on the historical. Do this first so that we don't miss any + // segment if they get handed off between the two calls. Segments loaded on historicals are deduplicated below, + // since we are only interested in realtime segments for now. if (includeRealtime) { - realtimeSegments = coordinatorClient.fetchServerViewSegments(dataSource, intervals); + realtimeAndHistoricalSegments = coordinatorClient.fetchServerViewSegments(dataSource, intervals); } else { - realtimeSegments = ImmutableList.of(); + realtimeAndHistoricalSegments = ImmutableList.of(); } // Fetch all published, used segments (all non-realtime segments) from the metadata store. // If the task is operating with a REPLACE lock, // any segment created after the lock was acquired for its interval will not be considered. - final Set publishedUsedSegments; + final Collection publishedUsedSegments; try { // Additional check as the task action does not accept empty intervals if (intervals.isEmpty()) { publishedUsedSegments = Collections.emptySet(); } else { - publishedUsedSegments = taskActionClient.submit(new RetrieveUsedSegmentsAction(dataSource, intervals)) - .stream() - .filter(s -> segments == null || segments.contains(s.toDescriptor())) - .collect(Collectors.toSet()); + publishedUsedSegments = + taskActionClient.submit(new RetrieveUsedSegmentsAction(dataSource, intervals)); } } catch (IOException e) { throw new MSQException(e, UnknownFault.forException(e)); } - if (segments != null && segments.size() != publishedUsedSegments.size()) { - Set missingSegments = - Sets.difference( - segments, - publishedUsedSegments.stream().map(DataSegment::toDescriptor).collect(Collectors.toSet()) - ); - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - "Missing [%d]segments, it could be outdated: %s", - missingSegments.size(), - missingSegments - ); - } + int realtimeCount = 0; - // Giving preference to published used segments first, Set.add doesn't replace segment if already exist. - // If any segments have been handed off in between the two metadata calls above, we directly fetch it from deep storage. + // Deduplicate segments, giving preference to published used segments. + // We do this so that if any segments have been handed off in between the two metadata calls above, + // we directly fetch it from deep storage. Set unifiedSegmentView = new HashSet<>(publishedUsedSegments); - int realtimeCount = 0; - // Iterate over the realtime segments to attach server metadata - for (ImmutableSegmentLoadInfo segmentLoadInfo : realtimeSegments) { + // Iterate over the realtime segments and segments loaded on the historical + for (ImmutableSegmentLoadInfo segmentLoadInfo : realtimeAndHistoricalSegments) { Set servers = segmentLoadInfo.getServers(); // Filter out only realtime servers. We don't want to query historicals for now, but we can in the future. // This check can be modified then. - Set realtimeServerMetadata = - servers.stream() - .filter(serverMetadata -> includeSegmentSource.getUsedServerTypes().contains(serverMetadata.getType())) - .collect(Collectors.toSet()); + Set realtimeServerMetadata + = servers.stream() + .filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes() + .contains(druidServerMetadata.getType()) + ) + .collect(Collectors.toSet()); if (!realtimeServerMetadata.isEmpty()) { + realtimeCount += 1; DataSegmentWithLocation dataSegmentWithLocation = new DataSegmentWithLocation( segmentLoadInfo.getSegment(), realtimeServerMetadata ); - if (unifiedSegmentView.add(dataSegmentWithLocation)) { - realtimeCount += 1; - } + unifiedSegmentView.add(dataSegmentWithLocation); } else { // We don't have any segments of the required segment source, ignore the segment } From 7200777b04760b0dce76842ea6b32e90927290af Mon Sep 17 00:00:00 2001 From: cecemei Date: Mon, 26 Jan 2026 20:37:01 -0800 Subject: [PATCH 4/4] test --- .../druid/msq/input/table/TableInputSpec.java | 5 +- .../IndexerTableInputSpecSlicerTest.java | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java index d973c290ca54..697c65599a68 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpec.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.LoadableSegment; @@ -84,8 +84,7 @@ public TableInputSpec( this.dataSource = dataSource; this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals; if (segments != null && segments.isEmpty()) { - throw DruidException.defensive( - "Can not supply empty segments as input, please use either null or non-empty segments."); + throw new IAE("Can not supply empty segments as input, please use either null or non-empty segments."); } this.segments = segments; this.filter = filter; diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java index 7393f0cf901a..28be6956a13a 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.DataSegment; @@ -41,6 +42,7 @@ import org.junit.Test; import java.util.Collections; +import java.util.List; public class IndexerTableInputSpecSlicerTest extends InitializedNullHandlingTest { @@ -223,6 +225,53 @@ public void test_sliceStatic_intervalFilterMatchesNothing() ); } + @Test + public void test_sliceStatic_segmentFilter() + { + final TableInputSpec spec = new TableInputSpec( + DATASOURCE, + null, + List.of(new SegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + )), + null, + null + ); + + RichSegmentDescriptor expectedSegment = new RichSegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + ); + Assert.assertEquals( + List.of(new SegmentsInputSlice(DATASOURCE, List.of(expectedSegment), List.of())), + slicer.sliceStatic(spec, 1)); + } + + @Test + public void test_sliceStatic_segmentAndIntervalFilter() + { + final TableInputSpec spec = new TableInputSpec( + DATASOURCE, + List.of(Intervals.of("2002/P1M")), + List.of(new SegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + )), + null, + null + ); + + Assert.assertEquals( + ImmutableList.of(NilInputSlice.INSTANCE, NilInputSlice.INSTANCE), + slicer.sliceStatic(spec, 2) + ); + } + @Test public void test_sliceStatic_dimFilter() {