Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ See [groupBy server configuration](../querying/groupbyquery.html#server-configur
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.segmentMetadata.defaultHistory`|When no interval is specified in the query, use a default interval of defaultHistory before the end time of the most recent segment, specified in ISO8601 format. This property also controls the duration of the default interval used by GET /druid/v2/datasources/{dataSourceName} interactions for retrieving datasource dimensions/metrics.|P1W|
|`druid.query.segmentMetadata.defaultAnalysisTypes`|This can be used to set the Default Analysis Types for all segment metadata queries, this can be overridden when making the query|["cardinality", "interval", "minmax"]|

### SQL

Expand Down
7 changes: 5 additions & 2 deletions docs/content/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ There are several main parts to a segment metadata query:
|toInclude|A JSON Object representing what columns should be included in the result. Defaults to "all".|no|
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.html)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "interval", "minmax"]. See section [analysisTypes](#analysistypes) for more details.|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "interval", "minmax"], but can be overridden with using this [BrokerConfig](../configuration/broker.html#segment-metadata-query-config). See section [analysisTypes](#analysistypes) for more details.|no|
|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no|

The format of the result is:
Expand Down Expand Up @@ -106,7 +106,10 @@ The grammar is as follows:

This is a list of properties that determines the amount of information returned about the columns, i.e. analyses to be performed on the columns.

By default, the "cardinality", "size", "interval", and "minmax" types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.
By default, the "cardinality", "interval", and "minmax" types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.

The default analysis types can be set in the broker configuration via:
`druid.query.segmentMetadata.defaultAnalysisTypes`

Types of column analyses are described below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@
package io.druid.query.metadata;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.joda.time.format.PeriodFormatter;

import java.util.EnumSet;

public class SegmentMetadataQueryConfig
{
private static final String DEFAULT_PERIOD_STRING = "P1W";
private static final PeriodFormatter ISO_FORMATTER = ISOPeriodFormat.standard();
private static final EnumSet<SegmentMetadataQuery.AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
SegmentMetadataQuery.AnalysisType.CARDINALITY,
SegmentMetadataQuery.AnalysisType.INTERVAL,
SegmentMetadataQuery.AnalysisType.MINMAX
);

@JsonProperty
private Period defaultHistory = ISO_FORMATTER.parsePeriod(DEFAULT_PERIOD_STRING);

@JsonProperty
private EnumSet<SegmentMetadataQuery.AnalysisType> defaultAnalysisTypes = DEFAULT_ANALYSIS_TYPES;

public SegmentMetadataQueryConfig(String period)
{
defaultHistory = ISO_FORMATTER.parsePeriod(period);
Expand All @@ -46,4 +56,16 @@ public Period getDefaultHistory()
{
return defaultHistory;
}

public void setDefaultHistory(String period)
{
this.defaultHistory = ISO_FORMATTER.parsePeriod(period);
}

public EnumSet<SegmentMetadataQuery.AnalysisType> getDefaultAnalysisTypes() { return defaultAnalysisTypes; }

public void setDefaultAnalysisTypes(EnumSet<SegmentMetadataQuery.AnalysisType> defaultAnalysisTypes)
{
this.defaultAnalysisTypes = defaultAnalysisTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.druid.java.util.common.guava.MappedSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.BySegmentSkippingQueryRunner;
import io.druid.query.CacheStrategy;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.GenericQueryMetricsFactory;
Expand All @@ -46,7 +47,6 @@
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.MetricManipulationFn;
Expand Down Expand Up @@ -89,7 +89,10 @@ public SegmentMetadataQueryQueryToolChest(SegmentMetadataQueryConfig config)
}

@Inject
public SegmentMetadataQueryQueryToolChest(SegmentMetadataQueryConfig config, GenericQueryMetricsFactory queryMetricsFactory)
public SegmentMetadataQueryQueryToolChest(
SegmentMetadataQueryConfig config,
GenericQueryMetricsFactory queryMetricsFactory
)
{
this.config = config;
this.queryMetricsFactory = queryMetricsFactory;
Expand All @@ -98,7 +101,7 @@ public SegmentMetadataQueryQueryToolChest(SegmentMetadataQueryConfig config, Gen
@Override
public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner)
{
return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
return new BySegmentSkippingQueryRunner<SegmentAnalysis>(runner)
{
@Override
public Sequence<SegmentAnalysis> doRun(
Expand All @@ -107,21 +110,21 @@ public Sequence<SegmentAnalysis> doRun(
Map<String, Object> context
)
{
Query<SegmentAnalysis> query = queryPlus.getQuery();
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) queryPlus.getQuery()).withFinalizedAnalysisTypes(config);
QueryPlus<SegmentAnalysis> updatedQueryPlus = queryPlus.withQuery(updatedQuery);
return new MappedSequence<>(
CombiningSequence.create(
baseRunner.run(queryPlus, context),
makeOrdering(query),
createMergeFn(query)
baseRunner.run(updatedQueryPlus, context),
makeOrdering(updatedQuery),
createMergeFn(updatedQuery)
),
MERGE_TRANSFORM_FN
);
}

@Override
protected Ordering<SegmentAnalysis> makeOrdering(Query<SegmentAnalysis> query)
private Ordering<SegmentAnalysis> makeOrdering(SegmentMetadataQuery query)
{
if (((SegmentMetadataQuery) query).isMerge()) {
if (query.isMerge()) {
// Merge everything always
return new Ordering<SegmentAnalysis>()
{
Expand All @@ -138,17 +141,14 @@ public int compare(
return query.getResultOrdering(); // No two elements should be equal, so it should never merge
}

@Override
protected BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMergeFn(final Query<SegmentAnalysis> inQ)
private BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMergeFn(final SegmentMetadataQuery inQ)
{
return new BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis>()
{
private final SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;

@Override
public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
{
return mergeAnalyses(arg1, arg2, query.isLenientAggregatorMerge());
return mergeAnalyses(arg1, arg2, inQ.isLenientAggregatorMerge());
}
};
}
Expand Down Expand Up @@ -189,8 +189,9 @@ public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
byte[] includerBytes = query.getToInclude().getCacheKey();
byte[] analysisTypesBytes = query.getAnalysisTypesCacheKey();
SegmentMetadataQuery updatedQuery = query.withFinalizedAnalysisTypes(config);
byte[] includerBytes = updatedQuery.getToInclude().getCacheKey();
byte[] analysisTypesBytes = updatedQuery.getAnalysisTypesCacheKey();
return ByteBuffer.allocate(1 + includerBytes.length + analysisTypesBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
Expand Down Expand Up @@ -238,7 +239,6 @@ public <T extends LogicalSegment> List<T> filterSegments(SegmentMetadataQuery qu
if (!query.isUsingDefaultInterval()) {
return segments;
}

if (segments.size() <= 1) {
return segments;
}
Expand Down Expand Up @@ -406,4 +406,9 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
analysis.isRollup()
);
}

public SegmentMetadataQueryConfig getConfig()
{
return this.config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
@Override
public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String, Object> responseContext)
{
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ.getQuery();
final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes());
SegmentMetadataQuery updatedQuery = ((SegmentMetadataQuery) inQ.getQuery())
.withFinalizedAnalysisTypes(toolChest.getConfig());
final SegmentAnalyzer analyzer = new SegmentAnalyzer(updatedQuery.getAnalysisTypes());
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(segment);
final long numRows = analyzer.numRows(segment);
long totalSize = 0;
Expand All @@ -98,7 +99,7 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String,
}

Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
ColumnIncluderator includerator = query.getToInclude();
ColumnIncluderator includerator = updatedQuery.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
final String columnName = entry.getKey();
final ColumnAnalysis column = entry.getValue();
Expand All @@ -110,12 +111,12 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String,
columns.put(columnName, column);
}
}
List<Interval> retIntervals = query.analyzingInterval() ?
List<Interval> retIntervals = updatedQuery.analyzingInterval() ?
Collections.singletonList(segment.getDataInterval()) : null;

final Map<String, AggregatorFactory> aggregators;
Metadata metadata = null;
if (query.hasAggregators()) {
if (updatedQuery.hasAggregators()) {
metadata = segment.asStorageAdapter().getMetadata();
if (metadata != null && metadata.getAggregators() != null) {
aggregators = Maps.newHashMap();
Expand All @@ -130,7 +131,7 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String,
}

final TimestampSpec timestampSpec;
if (query.hasTimestampSpec()) {
if (updatedQuery.hasTimestampSpec()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
Expand All @@ -140,7 +141,7 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String,
}

final Granularity queryGranularity;
if (query.hasQueryGranularity()) {
if (updatedQuery.hasQueryGranularity()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
Expand All @@ -150,7 +151,7 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, Map<String,
}

Boolean rollup = null;
if (query.hasRollup()) {
if (updatedQuery.hasRollup()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.druid.query.TableDataSource;
import io.druid.query.UnionDataSource;
import io.druid.query.filter.DimFilter;
import io.druid.query.metadata.SegmentMetadataQueryConfig;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;
Expand Down Expand Up @@ -86,12 +87,6 @@ public byte[] getCacheKey()
JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT
);

public static final EnumSet<AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
AnalysisType.CARDINALITY,
AnalysisType.INTERVAL,
AnalysisType.MINMAX
);

private final ColumnIncluderator toInclude;
private final boolean merge;
private final boolean usingDefaultInterval;
Expand Down Expand Up @@ -125,7 +120,7 @@ public SegmentMetadataQuery(
}
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
this.analysisTypes = (analysisTypes == null) ? DEFAULT_ANALYSIS_TYPES : analysisTypes;
this.analysisTypes = analysisTypes;
Preconditions.checkArgument(
dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource,
"SegmentMetadataQuery only supports table or union datasource"
Expand Down Expand Up @@ -254,6 +249,23 @@ public Query<SegmentAnalysis> withColumns(ColumnIncluderator includerator)
return Druids.SegmentMetadataQueryBuilder.copy(this).toInclude(includerator).build();
}

public SegmentMetadataQuery withFinalizedAnalysisTypes(SegmentMetadataQueryConfig config)
{
if (analysisTypes != null) {
return this;
}
return Druids.SegmentMetadataQueryBuilder
.copy(this)
.analysisTypes(config.getDefaultAnalysisTypes())
.build();
}

@Override
public List<Interval> getIntervals()
{
return this.getQuerySegmentSpec().getIntervals();
}

@Override
public String toString()
{
Expand Down
Loading