Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.Cursor;
Expand All @@ -45,13 +46,15 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.Iterator;
import java.util.List;

/**
*
*/
public class TimeBoundaryQueryRunnerFactory
implements QueryRunnerFactory<Result<TimeBoundaryResultValue>, TimeBoundaryQuery>
Expand Down Expand Up @@ -142,7 +145,7 @@ public Sequence<Result<TimeBoundaryResultValue>> run(
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class);
}

final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input;
final TimeBoundaryQuery query = (TimeBoundaryQuery) input;

return new BaseSequence<>(
new BaseSequence.IteratorMaker<Result<TimeBoundaryResultValue>, Iterator<Result<TimeBoundaryResultValue>>>()
Expand All @@ -155,26 +158,31 @@ public Iterator<Result<TimeBoundaryResultValue>> make()
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final DateTime minTime;
final DateTime maxTime;

if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) {
minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) {
maxTime = null;
} else {
maxTime = getTimeBoundary(adapter, legacyQuery, true);

DateTime minTime = null;
DateTime maxTime = null;

if (canUseAdapterMinMaxTime(query, adapter)) {
if (!query.isMaxTime()) {
minTime = adapter.getMinTime();
}

if (!query.isMinTime()) {
maxTime = adapter.getMaxTime();
}
} else {
minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();
if (!query.isMaxTime()) {
minTime = getTimeBoundary(adapter, query, false);
}

if (!query.isMinTime()) {
if (query.isMaxTime() || minTime != null) {
maxTime = getTimeBoundary(adapter, query, true);
}
}
}

return legacyQuery.buildResult(
return query.buildResult(
adapter.getInterval().getStart(),
minTime,
maxTime
Expand All @@ -186,17 +194,42 @@ public void cleanup(Iterator<Result<TimeBoundaryResultValue>> toClean)
{

}

private boolean queryIntervalContainsAdapterInterval()
{
List<Interval> queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals();
if (queryIntervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", queryIntervals);
}
return queryIntervals.get(0).contains(adapter.getInterval());
}
}
);
}
}

/**
* Whether a particular {@link TimeBoundaryQuery} can use {@link StorageAdapter#getMinTime()} and/or
* {@link StorageAdapter#getMaxTime()}. If false, must use {@link StorageAdapter#makeCursors}.
*/
private static boolean canUseAdapterMinMaxTime(final TimeBoundaryQuery query, final StorageAdapter adapter)
{
if (query.getFilter() != null) {
// We have to check which rows actually match the filter.
return false;
}

if (!(query.getDataSource() instanceof TableDataSource)) {
// In general, minTime / maxTime are only guaranteed to match data for regular tables.
//
// One example: an INNER JOIN can act as a filter and remove some rows. Another example: RowBasedStorageAdapter
// (used by e.g. inline data) uses nominal interval, not actual data, for minTime / maxTime.
return false;
}

final Interval queryInterval = CollectionUtils.getOnlyElement(
query.getQuerySegmentSpec().getIntervals(),
xs -> new IAE("Should only have one interval, got[%s]", xs)
);

if (!queryInterval.contains(adapter.getInterval())) {
// Query interval does not contain adapter interval. Need to create a cursor to see the first
// timestamp within the query interval.
return false;
}

// Passed all checks.
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,29 @@ default RowSignature getRowSignature()
* (or null) values.
*/
int getDimensionCardinality(String column);

/**
* Metadata-only operation that returns a lower bound on
* {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be earlier than
* the actual minimum data timestamp.
*
* For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter}
* specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method
* contract is tighter: it does return the actual minimum data timestamp. This fact is leveraged by
* {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only.
*/
DateTime getMinTime();

/**
* Metadata-only operation that returns an upper bound on
* {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be later than
* the actual maximum data timestamp.
*
* For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter}
* specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method
* contract is tighter: it does return the actual maximum data timestamp. This fact is leveraged by
* {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only.
*/
DateTime getMaxTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
Expand All @@ -41,11 +43,14 @@
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
Expand All @@ -64,9 +69,10 @@
import java.util.List;

/**
*
*/
@RunWith(Parameterized.class)
public class TimeBoundaryQueryRunnerTest
public class TimeBoundaryQueryRunnerTest extends InitializedNullHandlingTest
{
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
Expand Down Expand Up @@ -98,7 +104,7 @@ public TimeBoundaryQueryRunnerTest(
"2011-01-12T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t100.000000",
"2011-01-13T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t100.000000",
"2011-01-13T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t100.000000",
};
};
public static final String[] V_0113 = {
"2011-01-14T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t94.874713",
"2011-01-14T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
Expand All @@ -109,7 +115,7 @@ public TimeBoundaryQueryRunnerTest(
"2011-01-16T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
"2011-01-17T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t103.629399",
"2011-01-17T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299",
};
};

private static IncrementalIndex newIndex(String minTimeStamp)
{
Expand Down Expand Up @@ -150,7 +156,8 @@ private QueryRunner getCustomRunner() throws IOException
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1"));
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));

VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(
StringComparators.LEXICOGRAPHIC);
timeline.add(
index0.getInterval(),
"v1",
Expand Down Expand Up @@ -197,7 +204,8 @@ public void testTimeFilteredTimeBoundaryQuery() throws IOException
.dataSource("testing")
.intervals(
new MultipleIntervalSegmentSpec(
ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
ImmutableList.of(Intervals.of(
"2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z"))
)
)
.build();
Expand Down Expand Up @@ -247,14 +255,46 @@ public void testTimeBoundary()
Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime);
}

@Test
public void testTimeBoundaryInlineData()
{
final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{DateTimes.of("2000-01-02").getMillis()}),
RowSignature.builder().addTimeColumn().build()
);

TimeBoundaryQuery timeBoundaryQuery =
Druids.newTimeBoundaryQueryBuilder()
.dataSource(inlineDataSource)
.build();

Assert.assertFalse(timeBoundaryQuery.hasFilters());
final QueryRunner<Result<TimeBoundaryResultValue>> theRunner =
new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER).createRunner(
new RowBasedSegment<>(
SegmentId.dummy("dummy"),
Sequences.simple(inlineDataSource.getRows()),
inlineDataSource.rowAdapter(),
inlineDataSource.getRowSignature()
)
);
Iterable<Result<TimeBoundaryResultValue>> results = theRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList();
TimeBoundaryResultValue val = results.iterator().next().getValue();
DateTime minTime = val.getMinTime();
DateTime maxTime = val.getMaxTime();

Assert.assertEquals(DateTimes.of("2000-01-02"), minTime);
Assert.assertEquals(DateTimes.of("2000-01-02"), maxTime);
}

@Test(expected = UOE.class)
@SuppressWarnings("unchecked")
public void testTimeBoundaryArrayResults()
{
TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.bound(null)
.build();
.dataSource("testing")
.bound(null)
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();
new TimeBoundaryQueryQueryToolChest().resultsAsArrays(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Test;

import java.util.HashMap;
Expand Down Expand Up @@ -131,4 +135,47 @@ public void testMinMaxTimeQuery()
})
);
}

@Test
public void testMaxTimeQueryWithJoin()
{
// Cannot vectorize due to JOIN.
cannotVectorize();

HashMap<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true);

testBuilder()
.sql("SELECT MAX(t1.__time)\n"
+ "FROM foo t1\n"
+ "INNER JOIN foo t2 ON CAST(t1.m1 AS BIGINT) = t2.cnt\n")
.queryContext(context)
.expectedQueries(
ImmutableList.of(
Druids.newTimeBoundaryQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("cnt")
.context(context)
.build()
),
"j0.",
equalsCondition(makeExpression("CAST(\"m1\", 'LONG')"), makeColumnExpression("j0.cnt")),
JoinType.INNER
)

)
.bound(TimeBoundaryQuery.MAX_TIME)
.context(context)
.build()
)
)
.expectedResults(ImmutableList.of(new Object[]{946684800000L}))
.run();
}
}