Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/querying/datasourcemetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sidebar_label: "DatasourceMetadata"

Data Source Metadata queries return metadata information for a dataSource. These queries return information about:

* The timestamp of the latest ingested event for the dataSource. This is the ingested event without any consideration of rollup.
* `maxIngestedEventTime`: The timestamp of the latest ingested event for the dataSource. For realtime datasources, this may be later than `MAX(__time)` if `queryGranularity` is being used. For non-realtime datasources, this is equivalent to `MAX(__time)`.

The grammar for these queries is:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.MaxIngestedEventTimeInspector;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.Iterator;
import java.util.function.Supplier;

/**
*
*/
public class DataSourceMetadataQueryRunnerFactory
implements QueryRunnerFactory<Result<DataSourceMetadataResultValue>, DataSourceMetadataQuery>
Expand Down Expand Up @@ -81,12 +85,12 @@ public QueryToolChest<Result<DataSourceMetadataResultValue>, DataSourceMetadataQ
private static class DataSourceMetadataQueryRunner implements QueryRunner<Result<DataSourceMetadataResultValue>>
{
private final Interval segmentInterval;
private final MaxIngestedEventTimeInspector inspector;
private final Supplier<DateTime> inspector;

public DataSourceMetadataQueryRunner(Segment segment)
{
this.segmentInterval = segment.getDataInterval();
this.inspector = segment.as(MaxIngestedEventTimeInspector.class);
this.inspector = createInspector(segment);
}

@Override
Expand All @@ -110,7 +114,7 @@ public Iterator<Result<DataSourceMetadataResultValue>> make()
{
return legacyQuery.buildResult(
segmentInterval.getStart(),
(inspector != null ? inspector.getMaxIngestedEventTime() : null)
inspector.get()
).iterator();
}

Expand All @@ -122,5 +126,23 @@ public void cleanup(Iterator<Result<DataSourceMetadataResultValue>> toClean)
}
);
}

/**
* Create a maxIngestedEventTime supplier for a given segment.
*/
private static Supplier<DateTime> createInspector(final Segment segment)
{
final MaxIngestedEventTimeInspector ingestedEventTimeInspector = segment.as(MaxIngestedEventTimeInspector.class);
if (ingestedEventTimeInspector != null) {
return ingestedEventTimeInspector::getMaxIngestedEventTime;
}

final TimeBoundaryInspector timeBoundaryInspector = segment.as(TimeBoundaryInspector.class);
if (timeBoundaryInspector != null && timeBoundaryInspector.isMinMaxExact()) {
return timeBoundaryInspector::getMaxTime;
}

return () -> null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.LogicalSegment;
Expand All @@ -63,8 +66,8 @@ public class DataSourceMetadataQueryTest
public void testQuerySerialization() throws IOException
{
Query<?> query = Druids.newDataSourceMetadataQueryBuilder()
.dataSource("testing")
.build();
.dataSource("testing")
.build();

String json = JSON_MAPPER.writeValueAsString(query);
Query<?> serdeQuery = JSON_MAPPER.readValue(json, Query.class);
Expand Down Expand Up @@ -113,13 +116,36 @@ public void testContextSerde() throws Exception
Assert.assertEquals(true, queryContext.getBoolean(QueryContexts.FINALIZE_KEY, false));
}

@Test
public void testMaxIngestedEventTime() throws Exception
/**
* Build an index using a row with the provided event timestamp.
*/
private IncrementalIndex buildIndex(final DateTime eventTimestamp) throws Exception
{
final IncrementalIndex rtIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
.build();
rtIndex.add(
new MapBasedInputRow(
eventTimestamp.getMillis(),
ImmutableList.of("dim1"),
ImmutableMap.of("dim1", "x")
)
);
return rtIndex;
}

@Test
public void testMaxIngestedEventTimeIncrementalIndex() throws Exception
{
final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z");
final IncrementalIndex rtIndex = buildIndex(timestamp);
DataSourceMetadataQuery dataSourceMetadataQuery =
Druids.newDataSourceMetadataQueryBuilder()
.dataSource("testing")
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();

final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
new DataSourceMetadataQueryRunnerFactory(
Expand All @@ -129,19 +155,36 @@ public void testMaxIngestedEventTime() throws Exception
new IncrementalIndexSegment(rtIndex, SegmentId.dummy("test")),
null
);
DateTime timestamp = DateTimes.nowUtc();
rtIndex.add(
new MapBasedInputRow(
timestamp.getMillis(),
ImmutableList.of("dim1"),
ImmutableMap.of("dim1", "x")
)
);
DataSourceMetadataQuery dataSourceMetadataQuery = Druids.newDataSourceMetadataQueryBuilder()
.dataSource("testing")
.build();

Iterable<Result<DataSourceMetadataResultValue>> results =
runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
DataSourceMetadataResultValue val = results.iterator().next().getValue();
DateTime maxIngestedEventTime = val.getMaxIngestedEventTime();

Assert.assertEquals(timestamp, maxIngestedEventTime);
}

@Test
public void testMaxIngestedEventTimeQueryableIndex() throws Exception
{
final DateTime timestamp = DateTimes.of("2020-01-02T03:04:05.678Z");
final QueryableIndex queryableIndex = TestIndex.persistAndMemoryMap(buildIndex(timestamp));
DataSourceMetadataQuery dataSourceMetadataQuery =
Druids.newDataSourceMetadataQueryBuilder()
.dataSource("testing")
.build();
ResponseContext context = ConcurrentResponseContext.createEmpty();
context.initializeMissingSegments();

final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
new DataSourceMetadataQueryRunnerFactory(
new DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
new QueryableIndexSegment(queryableIndex, SegmentId.dummy("test")),
null
);

Iterable<Result<DataSourceMetadataResultValue>> results =
runner.run(QueryPlus.wrap(dataSourceMetadataQuery), context).toList();
DataSourceMetadataResultValue val = results.iterator().next().getValue();
Expand Down