Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.druid.msq.indexing;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
Expand All @@ -41,12 +43,14 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -157,24 +161,25 @@ private Set<DataSegmentWithInterval> getPrunedSegmentSet(final TableInputSpec ta
{
final TimelineLookup<String, DataSegment> timeline =
getTimeline(tableInputSpec.getDataSource(), tableInputSpec.getIntervals());
final Predicate<SegmentDescriptor> segmentFilter = tableInputSpec.getSegments() != null
? Set.copyOf(tableInputSpec.getSegments())::contains
: Predicates.alwaysTrue();

if (timeline == null) {
return Collections.emptySet();
} else {
// A segment can overlap with multiple search intervals, or even outside search intervals.
// The same segment can appear multiple times or 0 time, but each is also bounded within the overlapped search interval
Comment on lines +171 to +172
Copy link
Copy Markdown
Contributor

@capistrant capistrant Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I fully follow this comment.

or even outside search intervals

A segment outside search intervals. Is that referring to a segment that is in tableInputSpec.getSegments() but ends up not overlapping any search intervals and thus not get found?

The same segment can appear multiple times or 0 time

Same idea regarding the 0 time comment. Is that just saying that even though a segment was in tableInputSpec.getSegments(), it may not appear in the iterator?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think of it as a combination of MultipleIntervalSegmentSpec and MultipleSpecificSegmentSpec, on top of it this spec also has filter stuff.

the 0 time is actually not a change. even before this change think if we have two intervals day0 and day1, we can have segments from both days, so we could build a DataSegmentWithInterval with segment from day0 and interval day1, but it doesnt matter at the end. although we usually only have 1 interval so this dont happen often.

i guess the change here is that since now segments can be user input now, so they could be anything.

final Iterator<DataSegmentWithInterval> dataSegmentIterator =
tableInputSpec.getIntervals().stream()
.flatMap(interval -> timeline.lookup(interval).stream())
.flatMap(
holder ->
StreamSupport.stream(holder.getObject().spliterator(), false)
.filter(chunk -> !chunk.getObject().isTombstone())
.map(
chunk ->
new DataSegmentWithInterval(
chunk.getObject(),
holder.getInterval()
)
)
.map(PartitionChunk::getObject)
.filter(segment -> !segment.isTombstone())
.filter(segment -> segmentFilter.apply(segment.toDescriptor()))
.map(segment -> new DataSegmentWithInterval(segment, holder.getInterval()))
).iterator();

return DimFilterUtils.filterShards(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.LoadableSegment;
import org.apache.druid.msq.input.PhysicalInputSlice;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.DimFilter;
import org.joda.time.Interval;

Expand All @@ -44,6 +46,9 @@ public class TableInputSpec implements InputSpec
private final String dataSource;
private final List<Interval> intervals;

@Nullable
private final List<SegmentDescriptor> segments;

@Nullable
private final DimFilter filter;

Expand All @@ -58,6 +63,8 @@ public class TableInputSpec implements InputSpec
* meaning that when this spec is sliced and read, the returned {@link LoadableSegment}
* from {@link PhysicalInputSlice#getLoadableSegments()} are clipped to these intervals using
* {@link LoadableSegment#descriptor()}.
* @param segments specific segments to read, or null to read all segments in the intervals. If provided,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does there need to be documentation around what happens with non-null intervalsand non-nullsegmentsand some/all of those segments not overlapping with anything inintervals`? I think that is the most interesting part of adding this support for specific segments, ant potentially the most confusing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would just be the same if request year2025 data when there's only year2026 is available. msq input allows it, although compaction io config doesnt allow this (this is not in this pr though).

* only these segments will be read. Must not be empty if non-null.
* @param filter other filters to use for pruning, or null if no pruning is desired. Pruning filters are
* *not strict*, which means that processors must re-apply them when processing the returned
* {@link LoadableSegment} from {@link PhysicalInputSlice#getLoadableSegments()}. This matches how
Expand All @@ -69,16 +76,35 @@ public class TableInputSpec implements InputSpec
public TableInputSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") @Nullable List<Interval> intervals,
@JsonProperty("segments") @Nullable List<SegmentDescriptor> segments,
@JsonProperty("filter") @Nullable DimFilter filter,
@JsonProperty("filterFields") @Nullable Set<String> filterFields
)
{
this.dataSource = dataSource;
this.intervals = intervals == null ? Intervals.ONLY_ETERNITY : intervals;
if (segments != null && segments.isEmpty()) {
throw new IAE("Can not supply empty segments as input, please use either null or non-empty segments.");
}
this.segments = segments;
this.filter = filter;
this.filterFields = filterFields;
}

/**
* @deprecated Use {@link #TableInputSpec(String, List, List, DimFilter, Set)} with explicit null for segments instead.
*/
@Deprecated
public TableInputSpec(
String dataSource,
@Nullable List<Interval> intervals,
@Nullable DimFilter filter,
@Nullable Set<String> filterFields
)
{
this(dataSource, intervals, null, filter, filterFields);
}

@JsonProperty
public String getDataSource()
{
Expand All @@ -99,6 +125,14 @@ private List<Interval> getIntervalsForSerialization()
return intervals.equals(Intervals.ONLY_ETERNITY) ? null : intervals;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public List<SegmentDescriptor> getSegments()
{
return segments;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
Expand Down Expand Up @@ -127,14 +161,15 @@ public boolean equals(Object o)
TableInputSpec that = (TableInputSpec) o;
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(intervals, that.intervals)
&& Objects.equals(segments, that.segments)
&& Objects.equals(filter, that.filter)
&& Objects.equals(filterFields, that.filterFields);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, intervals, filter, filterFields);
return Objects.hash(dataSource, intervals, segments, filter, filterFields);
}

@Override
Expand All @@ -143,6 +178,7 @@ public String toString()
return "TableInputSpec{" +
"dataSource='" + dataSource + '\'' +
", intervals=" + intervals +
(segments == null ? "" : ", segments=" + segments) +
(filter == null ? "" : ", filter=" + filter) +
(filterFields == null ? "" : ", filterFields=" + filterFields) +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static InputSpec translateDataSource(DataSource dataSource)
{
if (dataSource instanceof TableDataSource) {
TableDataSource ids = (TableDataSource) dataSource;
TableInputSpec inputSpec = new TableInputSpec(ids.getName(), Intervals.ONLY_ETERNITY, null, null);
TableInputSpec inputSpec = new TableInputSpec(ids.getName(), Intervals.ONLY_ETERNITY, null, null, null);
return inputSpec;
}
if (dataSource instanceof InlineDataSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.RestrictedDataSource;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
Expand All @@ -61,7 +62,9 @@
import org.apache.druid.query.planning.JoinDataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinConditionAnalysis;
Expand Down Expand Up @@ -121,6 +124,11 @@ public class DataSourcePlan
}
}

DataSourcePlan withDataSource(DataSource newDataSource)
{
return new DataSourcePlan(newDataSource, inputSpecs, broadcastInputs, subQueryDefBuilder);
}

/**
* Build a plan.
*
Expand Down Expand Up @@ -161,15 +169,15 @@ public static DataSourcePlan forDataSource(
if (dataSource instanceof TableDataSource) {
return forTable(
(TableDataSource) dataSource,
querySegmentSpecIntervals(querySegmentSpec),
querySegmentSpec,
filter,
filterFields,
broadcast
);
} else if (dataSource instanceof RestrictedDataSource) {
return forRestricted(
(RestrictedDataSource) dataSource,
querySegmentSpecIntervals(querySegmentSpec),
querySegmentSpec,
filter,
filterFields,
broadcast
Expand Down Expand Up @@ -360,36 +368,41 @@ public boolean isSingleWorker()

private static DataSourcePlan forTable(
final TableDataSource dataSource,
final List<Interval> intervals,
final QuerySegmentSpec querySegmentSpec,
@Nullable final DimFilter filter,
@Nullable final Set<String> filterFields,
final boolean broadcast
)
{
final List<SegmentDescriptor> segments;
if (querySegmentSpec instanceof MultipleSpecificSegmentSpec) {
segments = ((MultipleSpecificSegmentSpec) querySegmentSpec).getDescriptors();
} else if (querySegmentSpec instanceof SpecificSegmentSpec) {
segments = List.of(((SpecificSegmentSpec) querySegmentSpec).getDescriptor());
} else {
segments = null;
}
List<Interval> intervals = querySegmentSpec.getIntervals();
return new DataSourcePlan(
(broadcast && dataSource.isGlobal()) ? dataSource : new InputNumberDataSource(0),
Collections.singletonList(new TableInputSpec(dataSource.getName(), intervals, filter, filterFields)),
List.of(new TableInputSpec(dataSource.getName(), intervals, segments, filter, filterFields)),
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
null
);
}

private static DataSourcePlan forRestricted(
final RestrictedDataSource dataSource,
final List<Interval> intervals,
final QuerySegmentSpec querySegmentSpec,
@Nullable final DimFilter filter,
@Nullable final Set<String> filterFields,
final boolean broadcast
)
{
return new DataSourcePlan(
(broadcast && dataSource.isGlobal())
? dataSource
: new RestrictedInputNumberDataSource(0, dataSource.getPolicy()),
Collections.singletonList(new TableInputSpec(dataSource.getBase().getName(), intervals, filter, filterFields)),
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
null
);
DataSource restricted = (broadcast && dataSource.isGlobal())
? dataSource
: new RestrictedInputNumberDataSource(0, dataSource.getPolicy());
return forTable(dataSource.getBase(), querySegmentSpec, filter, filterFields, broadcast).withDataSource(restricted);
}

private static DataSourcePlan forExternal(
Expand Down Expand Up @@ -791,7 +804,7 @@ private static List<Interval> querySegmentSpecIntervals(final QuerySegmentSpec q
/**
* Verify that the provided {@link QuerySegmentSpec} is a {@link MultipleIntervalSegmentSpec} with
* interval {@link Intervals#ETERNITY}. If not, throw an {@link UnsupportedOperationException}.
*
* <p>
* We don't need to support this for anything that is not {@link DataSourceAnalysis#isTableBased()}, because
* the SQL layer avoids "intervals" in other cases. See
* {@link org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void test_sliceDynamic()
{
// This slicer cannot sliceDynamic.

final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null);
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null);
Assertions.assertFalse(slicer.canSliceDynamic(inputSpec));
Assertions.assertThrows(
UnsupportedOperationException.class,
Expand All @@ -257,7 +257,7 @@ public void test_sliceStatic_wholeTable_oneSlice()
// When 1 slice is requested, all segments are assigned to one server, even if that server doesn't actually
// currently serve those segments.

final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null);
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null);
final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 1);
Assertions.assertEquals(
ImmutableList.of(
Expand Down Expand Up @@ -313,7 +313,7 @@ public void test_sliceStatic_wholeTable_twoSlices()
{
// When 2 slices are requested, we assign segments to the servers that have those segments.

final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null);
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null);
final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
Assertions.assertEquals(
ImmutableList.of(
Expand Down Expand Up @@ -375,7 +375,7 @@ public void test_sliceStatic_wholeTable_threeSlices()
{
// When 3 slices are requested, only 2 are returned, because we only have two workers.

final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null);
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null);
final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 3);
Assertions.assertEquals(
ImmutableList.of(
Expand Down Expand Up @@ -436,7 +436,7 @@ public void test_sliceStatic_wholeTable_threeSlices()
@Test
public void test_sliceStatic_nonexistentTable()
{
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null);
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE_NONEXISTENT, null, null, null, null);
final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 1);
Assertions.assertEquals(
Collections.emptyList(),
Expand All @@ -452,6 +452,7 @@ public void test_sliceStatic_dimensionFilter_twoSlices()
final TableInputSpec inputSpec = new TableInputSpec(
DATASOURCE,
null,
null,
new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null),
null
);
Expand Down Expand Up @@ -515,6 +516,7 @@ public void test_sliceStatic_timeFilter_twoSlices()
DATASOURCE,
Collections.singletonList(Intervals.of("2000/P1Y")),
null,
null,
null
);

Expand Down Expand Up @@ -559,7 +561,7 @@ void test_withoutRealtime_twoSlices()

// When 2 slices are requested, we assign segments to the servers that have those segments.

final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null);
final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null, null);
final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
// Expect segment 2 and then the realtime segments 5 and 6 to be assigned round-robin.
Assertions.assertEquals(
Expand Down
Loading
Loading