From 5e17cd55bf4479a284b78741f7564aca2b31a22d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 26 Apr 2023 17:00:13 -0700 Subject: [PATCH] TimeBoundary: Use cursor when datasource is not a regular table. (#14151) * TimeBoundary: Use cursor when datasource is not a regular table. Fixes a bug where TimeBoundary could return incorrect results with INNER Join or inline data. * Addl Javadocs. --- .../TimeBoundaryQueryRunnerFactory.java | 85 +++++++++++++------ .../apache/druid/segment/StorageAdapter.java | 22 +++++ .../TimeBoundaryQueryRunnerTest.java | 56 ++++++++++-- .../calcite/CalciteTimeBoundaryQueryTest.java | 47 ++++++++++ 4 files changed, 176 insertions(+), 34 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index 71a1938edaca..7a695987fb76 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -37,6 +37,7 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.Result; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -45,6 +46,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.filter.Filters; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -52,6 +54,7 @@ import java.util.List; /** + * */ public class TimeBoundaryQueryRunnerFactory implements QueryRunnerFactory, TimeBoundaryQuery> @@ -142,7 +145,7 @@ public Sequence> run( throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TimeBoundaryQuery.class); } - final TimeBoundaryQuery legacyQuery = (TimeBoundaryQuery) input; + final TimeBoundaryQuery query = (TimeBoundaryQuery) input; return new BaseSequence<>( new BaseSequence.IteratorMaker, Iterator>>() @@ -155,26 +158,31 @@ public Iterator> make() "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." ); } - final DateTime minTime; - final DateTime maxTime; - - if (legacyQuery.getFilter() != null || !queryIntervalContainsAdapterInterval()) { - minTime = getTimeBoundary(adapter, legacyQuery, false); - if (minTime == null) { - maxTime = null; - } else { - maxTime = getTimeBoundary(adapter, legacyQuery, true); + + DateTime minTime = null; + DateTime maxTime = null; + + if (canUseAdapterMinMaxTime(query, adapter)) { + if (!query.isMaxTime()) { + minTime = adapter.getMinTime(); + } + + if (!query.isMinTime()) { + maxTime = adapter.getMaxTime(); } } else { - minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME) - ? null - : adapter.getMinTime(); - maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME) - ? null - : adapter.getMaxTime(); + if (!query.isMaxTime()) { + minTime = getTimeBoundary(adapter, query, false); + } + + if (!query.isMinTime()) { + if (query.isMaxTime() || minTime != null) { + maxTime = getTimeBoundary(adapter, query, true); + } + } } - return legacyQuery.buildResult( + return query.buildResult( adapter.getInterval().getStart(), minTime, maxTime @@ -186,17 +194,42 @@ public void cleanup(Iterator> toClean) { } - - private boolean queryIntervalContainsAdapterInterval() - { - List queryIntervals = legacyQuery.getQuerySegmentSpec().getIntervals(); - if (queryIntervals.size() != 1) { - throw new IAE("Should only have one interval, got[%s]", queryIntervals); - } - return queryIntervals.get(0).contains(adapter.getInterval()); - } } ); } } + + /** + * Whether a particular {@link TimeBoundaryQuery} can use {@link StorageAdapter#getMinTime()} and/or + * {@link StorageAdapter#getMaxTime()}. If false, must use {@link StorageAdapter#makeCursors}. + */ + private static boolean canUseAdapterMinMaxTime(final TimeBoundaryQuery query, final StorageAdapter adapter) + { + if (query.getFilter() != null) { + // We have to check which rows actually match the filter. + return false; + } + + if (!(query.getDataSource() instanceof TableDataSource)) { + // In general, minTime / maxTime are only guaranteed to match data for regular tables. + // + // One example: an INNER JOIN can act as a filter and remove some rows. Another example: RowBasedStorageAdapter + // (used by e.g. inline data) uses nominal interval, not actual data, for minTime / maxTime. + return false; + } + + final Interval queryInterval = CollectionUtils.getOnlyElement( + query.getQuerySegmentSpec().getIntervals(), + xs -> new IAE("Should only have one interval, got[%s]", xs) + ); + + if (!queryInterval.contains(adapter.getInterval())) { + // Query interval does not contain adapter interval. Need to create a cursor to see the first + // timestamp within the query interval. + return false; + } + + // Passed all checks. + return true; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java index ce23ae7d1ccb..2d3fc6a50bbb 100644 --- a/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/StorageAdapter.java @@ -66,7 +66,29 @@ default RowSignature getRowSignature() * (or null) values. */ int getDimensionCardinality(String column); + + /** + * Metadata-only operation that returns a lower bound on + * {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be earlier than + * the actual minimum data timestamp. + * + * For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter} + * specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method + * contract is tighter: it does return the actual minimum data timestamp. This fact is leveraged by + * {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only. + */ DateTime getMinTime(); + + /** + * Metadata-only operation that returns an upper bound on + * {@link org.apache.druid.segment.column.ColumnHolder#TIME_COLUMN_NAME} values for this adapter. May be later than + * the actual maximum data timestamp. + * + * For {@link QueryableIndexStorageAdapter} and {@link org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter} + * specifically, which back regular tables (i.e. {@link org.apache.druid.query.TableDataSource}), this method + * contract is tighter: it does return the actual maximum data timestamp. This fact is leveraged by + * {@link org.apache.druid.query.timeboundary.TimeBoundaryQuery} to return results using metadata only. + */ DateTime getMaxTime(); /** diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index 4b46bbcf879e..02ba0ac44f4f 100644 --- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -28,7 +28,9 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.Druids; +import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -41,11 +43,14 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -64,9 +69,10 @@ import java.util.List; /** + * */ @RunWith(Parameterized.class) -public class TimeBoundaryQueryRunnerTest +public class TimeBoundaryQueryRunnerTest extends InitializedNullHandlingTest { @Parameterized.Parameters(name = "{0}") public static Iterable constructorFeeder() @@ -98,7 +104,7 @@ public TimeBoundaryQueryRunnerTest( "2011-01-12T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t100.000000", "2011-01-13T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t100.000000", "2011-01-13T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t100.000000", - }; + }; public static final String[] V_0113 = { "2011-01-14T00:00:00.000Z\tspot\tautomotive\t1000\t10000.0\t100000\tpreferred\tapreferred\t94.874713", "2011-01-14T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299", @@ -109,7 +115,7 @@ public TimeBoundaryQueryRunnerTest( "2011-01-16T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299", "2011-01-17T01:00:00.000Z\tspot\tbusiness\t1100\t11000.0\t110000\tpreferred\tbpreferred\t103.629399", "2011-01-17T02:00:00.000Z\tspot\tentertainment\t1200\t12000.0\t120000\tpreferred\tepreferred\t110.087299", - }; + }; private static IncrementalIndex newIndex(String minTimeStamp) { @@ -150,7 +156,8 @@ private QueryRunner getCustomRunner() throws IOException segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1")); segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1")); - VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC); + VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( + StringComparators.LEXICOGRAPHIC); timeline.add( index0.getInterval(), "v1", @@ -197,7 +204,8 @@ public void testTimeFilteredTimeBoundaryQuery() throws IOException .dataSource("testing") .intervals( new MultipleIntervalSegmentSpec( - ImmutableList.of(Intervals.of("2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z")) + ImmutableList.of(Intervals.of( + "2011-01-15T00:00:00.000Z/2011-01-16T00:00:00.000Z")) ) ) .build(); @@ -247,14 +255,46 @@ public void testTimeBoundary() Assert.assertEquals(DateTimes.of("2011-04-15T00:00:00.000Z"), maxTime); } + @Test + public void testTimeBoundaryInlineData() + { + final InlineDataSource inlineDataSource = InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{DateTimes.of("2000-01-02").getMillis()}), + RowSignature.builder().addTimeColumn().build() + ); + + TimeBoundaryQuery timeBoundaryQuery = + Druids.newTimeBoundaryQueryBuilder() + .dataSource(inlineDataSource) + .build(); + + Assert.assertFalse(timeBoundaryQuery.hasFilters()); + final QueryRunner> theRunner = + new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER).createRunner( + new RowBasedSegment<>( + SegmentId.dummy("dummy"), + Sequences.simple(inlineDataSource.getRows()), + inlineDataSource.rowAdapter(), + inlineDataSource.getRowSignature() + ) + ); + Iterable> results = theRunner.run(QueryPlus.wrap(timeBoundaryQuery)).toList(); + TimeBoundaryResultValue val = results.iterator().next().getValue(); + DateTime minTime = val.getMinTime(); + DateTime maxTime = val.getMaxTime(); + + Assert.assertEquals(DateTimes.of("2000-01-02"), minTime); + Assert.assertEquals(DateTimes.of("2000-01-02"), maxTime); + } + @Test(expected = UOE.class) @SuppressWarnings("unchecked") public void testTimeBoundaryArrayResults() { TimeBoundaryQuery timeBoundaryQuery = Druids.newTimeBoundaryQueryBuilder() - .dataSource("testing") - .bound(null) - .build(); + .dataSource("testing") + .bound(null) + .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); context.initializeMissingSegments(); new TimeBoundaryQueryQueryToolChest().resultsAsArrays( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java index bb33b7141172..c4cd002fbeac 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteTimeBoundaryQueryTest.java @@ -24,11 +24,15 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; +import org.apache.druid.segment.join.JoinType; import org.apache.druid.sql.calcite.filtration.Filtration; +import org.apache.druid.sql.calcite.util.CalciteTests; import org.junit.Test; import java.util.HashMap; @@ -131,4 +135,47 @@ public void testMinMaxTimeQuery() }) ); } + + @Test + public void testMaxTimeQueryWithJoin() + { + // Cannot vectorize due to JOIN. + cannotVectorize(); + + HashMap context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + context.put(QueryContexts.TIME_BOUNDARY_PLANNING_KEY, true); + + testBuilder() + .sql("SELECT MAX(t1.__time)\n" + + "FROM foo t1\n" + + "INNER JOIN foo t2 ON CAST(t1.m1 AS BIGINT) = t2.cnt\n") + .queryContext(context) + .expectedQueries( + ImmutableList.of( + Druids.newTimeBoundaryQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("cnt") + .context(context) + .build() + ), + "j0.", + equalsCondition(makeExpression("CAST(\"m1\", 'LONG')"), makeColumnExpression("j0.cnt")), + JoinType.INNER + ) + + ) + .bound(TimeBoundaryQuery.MAX_TIME) + .context(context) + .build() + ) + ) + .expectedResults(ImmutableList.of(new Object[]{946684800000L})) + .run(); + } }