From 9ffc0475a9013eb575cd39ef0a0db90ce4d10e83 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 30 Jan 2025 09:32:18 -0800 Subject: [PATCH] DataSourceMetadataQuery: Use TimeBoundaryInspector as a fallback. PR #16849 changed the behavior such that maxIngestedEventTime is not updated for non-real-time data. This patch restores the old behavior for non-real-time data by using a TimeBoundaryInspector when MaxIngestedEventTimeInspector is not present. --- docs/querying/datasourcemetadataquery.md | 2 +- .../DataSourceMetadataQueryRunnerFactory.java | 28 ++++++- .../DataSourceMetadataQueryTest.java | 73 +++++++++++++++---- 3 files changed, 84 insertions(+), 19 deletions(-) diff --git a/docs/querying/datasourcemetadataquery.md b/docs/querying/datasourcemetadataquery.md index 1f1cc0d49d5a..0a77426e765f 100644 --- a/docs/querying/datasourcemetadataquery.md +++ b/docs/querying/datasourcemetadataquery.md @@ -31,7 +31,7 @@ sidebar_label: "DatasourceMetadata" Data Source Metadata queries return metadata information for a dataSource. These queries return information about: -* The timestamp of the latest ingested event for the dataSource. This is the ingested event without any consideration of rollup. +* `maxIngestedEventTime`: The timestamp of the latest ingested event for the dataSource. For realtime datasources, this may be later than `MAX(__time)` if `queryGranularity` is being used. For non-realtime datasources, this is equivalent to `MAX(__time)`. The grammar for these queries is: diff --git a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java index 0a5ef2a7680a..645f65bd8ada 100644 --- a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java @@ -35,11 +35,15 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.segment.MaxIngestedEventTimeInspector; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TimeBoundaryInspector; +import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.Iterator; +import java.util.function.Supplier; /** + * */ public class DataSourceMetadataQueryRunnerFactory implements QueryRunnerFactory, DataSourceMetadataQuery> @@ -81,12 +85,12 @@ public QueryToolChest, DataSourceMetadataQ private static class DataSourceMetadataQueryRunner implements QueryRunner> { private final Interval segmentInterval; - private final MaxIngestedEventTimeInspector inspector; + private final Supplier inspector; public DataSourceMetadataQueryRunner(Segment segment) { this.segmentInterval = segment.getDataInterval(); - this.inspector = segment.as(MaxIngestedEventTimeInspector.class); + this.inspector = createInspector(segment); } @Override @@ -110,7 +114,7 @@ public Iterator> make() { return legacyQuery.buildResult( segmentInterval.getStart(), - (inspector != null ? inspector.getMaxIngestedEventTime() : null) + inspector.get() ).iterator(); } @@ -122,5 +126,23 @@ public void cleanup(Iterator> toClean) } ); } + + /** + * Create a maxIngestedEventTime supplier for a given segment. + */ + private static Supplier createInspector(final Segment segment) + { + final MaxIngestedEventTimeInspector ingestedEventTimeInspector = segment.as(MaxIngestedEventTimeInspector.class); + if (ingestedEventTimeInspector != null) { + return ingestedEventTimeInspector::getMaxIngestedEventTime; + } + + final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class); + if (timeBoundaryInspector != null && timeBoundaryInspector.isMinMaxExact()) { + return timeBoundaryInspector::getMaxTime; + } + + return () -> null; + } } } diff --git a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index 3f0c0401e9a9..e5b17e164a75 100644 --- a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -41,6 +41,9 @@ import org.apache.druid.query.context.ConcurrentResponseContext; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.timeline.LogicalSegment; @@ -63,8 +66,8 @@ public class DataSourceMetadataQueryTest public void testQuerySerialization() throws IOException { Query query = Druids.newDataSourceMetadataQueryBuilder() - .dataSource("testing") - .build(); + .dataSource("testing") + .build(); String json = JSON_MAPPER.writeValueAsString(query); Query serdeQuery = JSON_MAPPER.readValue(json, Query.class); @@ -113,13 +116,36 @@ public void testContextSerde() throws Exception Assert.assertEquals(true, queryContext.getBoolean(QueryContexts.FINALIZE_KEY, false)); } - @Test - public void testMaxIngestedEventTime() throws Exception + /** + * Build an index using a row with the provided event timestamp. + */ + private IncrementalIndex buildIndex(final DateTime eventTimestamp) throws Exception { final IncrementalIndex rtIndex = new OnheapIncrementalIndex.Builder() .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) .setMaxRowCount(1000) .build(); + rtIndex.add( + new MapBasedInputRow( + eventTimestamp.getMillis(), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "x") + ) + ); + return rtIndex; + } + + @Test + public void testMaxIngestedEventTimeIncrementalIndex() throws Exception + { + final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z"); + final IncrementalIndex rtIndex = buildIndex(timestamp); + DataSourceMetadataQuery dataSourceMetadataQuery = + Druids.newDataSourceMetadataQueryBuilder() + .dataSource("testing") + .build(); + ResponseContext context = ConcurrentResponseContext.createEmpty(); + context.initializeMissingSegments(); final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( new DataSourceMetadataQueryRunnerFactory( @@ -129,19 +155,36 @@ public void testMaxIngestedEventTime() throws Exception new IncrementalIndexSegment(rtIndex, SegmentId.dummy("test")), null ); - DateTime timestamp = DateTimes.nowUtc(); - rtIndex.add( - new MapBasedInputRow( - timestamp.getMillis(), - ImmutableList.of("dim1"), - ImmutableMap.of("dim1", "x") - ) - ); - DataSourceMetadataQuery dataSourceMetadataQuery = Druids.newDataSourceMetadataQueryBuilder() - .dataSource("testing") - .build(); + + Iterable> results = + runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList(); + DataSourceMetadataResultValue val = results.iterator().next().getValue(); + DateTime maxIngestedEventTime = val.getMaxIngestedEventTime(); + + Assert.assertEquals(timestamp, maxIngestedEventTime); + } + + @Test + public void testMaxIngestedEventTimeQueryableIndex() throws Exception + { + final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z"); + final QueryableIndex queryableIndex = TestIndex.persistAndMemoryMap(buildIndex(timestamp)); + DataSourceMetadataQuery dataSourceMetadataQuery = + Druids.newDataSourceMetadataQueryBuilder() + .dataSource("testing") + .build(); ResponseContext context = ConcurrentResponseContext.createEmpty(); context.initializeMissingSegments(); + + final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( + new DataSourceMetadataQueryRunnerFactory( + new DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()), + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")), + null + ); + Iterable> results = runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList(); DataSourceMetadataResultValue val = results.iterator().next().getValue();