Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions docs/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ There are several main parts to a segment metadata query:
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.md)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "interval", "minmax"], but can be overridden with using the [segment metadata query config](../configuration/index.md#segmentmetadata-query-config). See section [analysisTypes](#analysistypes) for more details.|no|
|lenientAggregatorMerge|If true, and if the "aggregators" analysisType is enabled, aggregators will be merged leniently. See below for details.|no|
|aggregatorMergeStrategy| The strategy Druid uses to merge aggregators across segments. If true and if the `aggregators` analysis type is enabled, `aggregatorMergeStrategy` defaults to `strict`. Possible values include `strict`, `lenient`, and `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.|no|
|lenientAggregatorMerge|Deprecated. Use `aggregatorMergeStrategy` property instead. If true, and if the `aggregators` analysis type is enabled, Druid merges aggregators leniently.|no|

The format of the result is:

Expand Down Expand Up @@ -185,7 +186,7 @@ Currently, there is no API for retrieving this information.
* `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be
null if the aggregators are unknown or unmergeable (if merging is enabled).

* Merging can be strict or lenient. See *lenientAggregatorMerge* below for details.
* Merging can be `strict`, `lenient`, or `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.

* The form of the result is a map of column name to aggregator.

Expand All @@ -194,15 +195,20 @@ null if the aggregators are unknown or unmergeable (if merging is enabled).
* `rollup` in the result is true/false/null.
* When merging is enabled, if some are rollup, others are not, result is null.

## lenientAggregatorMerge
### aggregatorMergeStrategy

Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
two segments use incompatible aggregators for the same column (e.g. longSum changed to doubleSum).
two segments use incompatible aggregators for the same column, such as `longSum` changed to `doubleSum`.
Druid supports the following aggregator merge strategies:

Aggregators can be merged strictly (the default) or leniently. With strict merging, if there are any segments
with unknown aggregators, or any conflicts of any kind, the merged aggregators list will be `null`. With lenient
merging, segments with unknown aggregators will be ignored, and conflicts between aggregators will only null out
the aggregator for that particular column.
- `strict`: If there are any segments with unknown aggregators or any conflicts of any kind, the merged aggregators
list is `null`.
- `lenient`: Druid ignores segments with unknown aggregators. Conflicts between aggregators set the aggregator for that particular column to null.
- the aggregator for that particular column.
- `latest`: In the event of conflicts between segments, Druid selects the aggregator from the most recent segment
for that particular column.

In particular, with lenient merging, it is possible for an individual column's aggregator to be `null`. This will not
occur with strict merging.

### lenientAggregatorMerge (deprecated)

Deprecated. Use [`aggregatorMergeStrategy`](#aggregatormergestrategy) instead.
15 changes: 13 additions & 2 deletions processing/src/main/java/org/apache/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy;
import org.apache.druid.query.metadata.metadata.ColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.scan.ScanQuery;
Expand Down Expand Up @@ -659,6 +660,7 @@ public static class SegmentMetadataQueryBuilder
private EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
private Boolean merge;
private Boolean lenientAggregatorMerge;
private AggregatorMergeStrategy aggregatorMergeStrategy;
private Boolean usingDefaultInterval;
private Map<String, Object> context;

Expand All @@ -670,6 +672,7 @@ public SegmentMetadataQueryBuilder()
analysisTypes = null;
merge = null;
lenientAggregatorMerge = null;
aggregatorMergeStrategy = null;
usingDefaultInterval = null;
context = null;
}
Expand All @@ -684,7 +687,8 @@ public SegmentMetadataQuery build()
context,
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
lenientAggregatorMerge,
aggregatorMergeStrategy
);
}

Expand All @@ -696,7 +700,7 @@ public static SegmentMetadataQueryBuilder copy(SegmentMetadataQuery query)
.toInclude(query.getToInclude())
.analysisTypes(query.getAnalysisTypes())
.merge(query.isMerge())
.lenientAggregatorMerge(query.isLenientAggregatorMerge())
.aggregatorMergeStrategy(query.getAggregatorMergeStrategy())
.usingDefaultInterval(query.isUsingDefaultInterval())
.context(query.getContext());
}
Expand Down Expand Up @@ -761,12 +765,19 @@ public SegmentMetadataQueryBuilder merge(boolean merge)
return this;
}

@Deprecated
public SegmentMetadataQueryBuilder lenientAggregatorMerge(boolean lenientAggregatorMerge)
{
this.lenientAggregatorMerge = lenientAggregatorMerge;
return this;
}

public SegmentMetadataQueryBuilder aggregatorMergeStrategy(AggregatorMergeStrategy aggregatorMergeStrategy)
{
this.aggregatorMergeStrategy = aggregatorMergeStrategy;
return this;
}

public SegmentMetadataQueryBuilder usingDefaultInterval(boolean usingDefaultInterval)
{
this.usingDefaultInterval = usingDefaultInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.inject.Inject;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
Expand All @@ -50,15 +52,16 @@
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.metadata.metadata.AggregatorMergeStrategy;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -139,10 +142,10 @@ private BinaryOperator<SegmentAnalysis> createMergeFn(final SegmentMetadataQuery
public BinaryOperator<SegmentAnalysis> createMergeFn(Query<SegmentAnalysis> query)
{
return (arg1, arg2) -> mergeAnalyses(
Iterables.getFirst(query.getDataSource().getTableNames(), null),
query.getDataSource().getTableNames(),
arg1,
arg2,
((SegmentMetadataQuery) query).isLenientAggregatorMerge()
((SegmentMetadataQuery) query).getAggregatorMergeStrategy()
);
}

Expand Down Expand Up @@ -205,7 +208,6 @@ public byte[] computeResultLevelCacheKey(SegmentMetadataQuery query)
// need to include query "merge" and "lenientAggregatorMerge" for result level cache key
return new CacheKeyBuilder(SEGMENT_METADATA_QUERY).appendByteArray(computeCacheKey(query))
.appendBoolean(query.isMerge())
.appendBoolean(query.isLenientAggregatorMerge())
.build();
}

Expand Down Expand Up @@ -254,10 +256,10 @@ public <T extends LogicalSegment> List<T> filterSegments(SegmentMetadataQuery qu

@VisibleForTesting
public static SegmentAnalysis mergeAnalyses(
@Nullable String dataSource,
Set<String> dataSources,
Comment thread
zachjsh marked this conversation as resolved.
SegmentAnalysis arg1,
SegmentAnalysis arg2,
boolean lenientAggregatorMerge
AggregatorMergeStrategy aggregatorMergeStrategy
)
{
if (arg1 == null) {
Expand All @@ -268,16 +270,30 @@ public static SegmentAnalysis mergeAnalyses(
return arg1;
}

// Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
// We're preserving it so callers can see columns in their natural order.
if (dataSource != null) {
// This is a defensive check since SegementMetadata query instantiation guarantees this
if (CollectionUtils.isNullOrEmpty(dataSources)) {
throw InvalidInput.exception("SegementMetadata queries require at least one datasource.");
}

SegmentId mergedSegmentId = null;

for (String dataSource : dataSources) {
final SegmentId id1 = SegmentId.tryParse(dataSource, arg1.getId());
final SegmentId id2 = SegmentId.tryParse(dataSource, arg2.getId());

if (id1 != null && id2 != null && id2.getIntervalEnd().isAfter(id1.getIntervalEnd())) {
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
// Swap arg1, arg2 so the later-ending interval is first. This ensures we prefer the latest column order.
// We're preserving it so callers can see columns in their natural order.
if (id1 != null && id2 != null) {
if (id2.getIntervalEnd().isAfter(id1.getIntervalEnd()) ||
(id2.getIntervalEnd().isEqual(id1.getIntervalEnd()) && id2.getPartitionNum() > id1.getPartitionNum())) {
mergedSegmentId = SegmentId.merged(dataSource, id2.getInterval(), id2.getPartitionNum());
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
} else {
mergedSegmentId = SegmentId.merged(dataSource, id1.getInterval(), id1.getPartitionNum());
}
break;
}
Comment thread
abhishekrb19 marked this conversation as resolved.
}

Expand Down Expand Up @@ -309,7 +325,7 @@ public static SegmentAnalysis mergeAnalyses(

final Map<String, AggregatorFactory> aggregators = new HashMap<>();

if (lenientAggregatorMerge) {
if (AggregatorMergeStrategy.LENIENT == aggregatorMergeStrategy) {
// Merge each aggregator individually, ignoring nulls
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
Comment thread
zachjsh marked this conversation as resolved.
if (analysis.getAggregators() != null) {
Expand All @@ -331,7 +347,7 @@ public static SegmentAnalysis mergeAnalyses(
}
}
}
} else {
} else if (AggregatorMergeStrategy.STRICT == aggregatorMergeStrategy) {
final AggregatorFactory[] aggs1 = arg1.getAggregators() != null
? arg1.getAggregators()
.values()
Expand All @@ -348,6 +364,20 @@ public static SegmentAnalysis mergeAnalyses(
aggregators.put(aggregator.getName(), aggregator);
}
}
} else if (AggregatorMergeStrategy.LATEST == aggregatorMergeStrategy) {
// The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
// followed by arg2.
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
final String aggregatorName = entry.getKey();
final AggregatorFactory aggregator = entry.getValue();
aggregators.putIfAbsent(aggregatorName, aggregator);
}
}
}
} else {
throw DruidException.defensive("[%s] merge strategy is not implemented.", aggregatorMergeStrategy);
}

final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
Expand All @@ -369,7 +399,7 @@ public static SegmentAnalysis mergeAnalyses(
if (arg1.getId() != null && arg2.getId() != null && arg1.getId().equals(arg2.getId())) {
mergedId = arg1.getId();
} else {
mergedId = "merged";
mergedId = mergedSegmentId == null ? "merged" : mergedSegmentId.toString();
Comment thread
zachjsh marked this conversation as resolved.
}

final Boolean rollup;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.metadata.metadata;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.java.util.common.StringUtils;

public enum AggregatorMergeStrategy
{
STRICT,
LENIENT,
LATEST;

@JsonValue
@Override
public String toString()
{
return StringUtils.toLowerCase(this.name());
}

@JsonCreator
public static AggregatorMergeStrategy fromString(String name)
{
return valueOf(StringUtils.toUpperCase(name));
}
}
Loading