Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public Sequence<SegmentAnalysis> runSegmentMetadataQuery(Iterable<SegmentId> seg
null,
null,
null,
null,
false
)
)
Expand Down
9 changes: 7 additions & 2 deletions docs/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ null if the aggregators are unknown or unmergeable (if merging is enabled).
* `rollup` in the result is true/false/null.
* When merging is enabled, if some are rollup, others are not, result is null.

### aggregatorMergeStrategy
### projections

* `projections` in the result will contain the list of projections in segments.
* if any conflicting projections are identified, the conflicting one will be excluded, while the non-conflicting ones will be included.

## aggregatorMergeStrategy

Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
two segments use incompatible aggregators for the same column, such as `longSum` changed to `doubleSum`.
Expand All @@ -213,6 +218,6 @@ Druid supports the following aggregator merge strategies:
for that particular column.


### lenientAggregatorMerge (deprecated)
## lenientAggregatorMerge (deprecated)

Deprecated. Use [`aggregatorMergeStrategy`](#aggregatormergestrategy) instead.
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@
"size":0,
"numRows":3702583,
"aggregators":null,
"projections":null,
"timestampSpec":null,
"queryGranularity":null,
"rollup":null
Expand All @@ -626,6 +627,7 @@
"size":0,
"numRows":3743002,
"aggregators":null,
"projections":null,
"timestampSpec":null,
"queryGranularity":null,
"rollup":null
Expand All @@ -649,6 +651,7 @@
"size":0,
"numRows":3502959,
"aggregators":null,
"projections":null,
"timestampSpec":null,
"queryGranularity":null,
"rollup":null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,7 @@
"size":0,
"numRows":4462111,
"aggregators":null,
"projections":null,
"timestampSpec":null,
"queryGranularity":null,
"rollup":null
Expand Down
5 changes: 5 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,11 @@
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,20 @@ public static AggregatorFactory[] mergeAggregators(List<AggregatorFactory[]> agg
mergedAggregators.put(name, aggregator.getMergingFactory(other));
}
catch (AggregatorFactoryNotMergeableException ex) {
// Aggregator with the same name can't be merged, log it and return null early
log.warn(ex, "failed to merge aggregator factories");
mergedAggregators = null;
break;
return null;
}
} else {
mergedAggregators.put(name, aggregator);
}
}
} else {
mergedAggregators = null;
break;
// one of the segments being merged has unknown aggregators, return null early
return null;
}
}

return mergedAggregators == null ? null : mergedAggregators.values().toArray(new AggregatorFactory[0]);
return mergedAggregators.values().toArray(new AggregatorFactory[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
Expand All @@ -57,6 +56,7 @@
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
Expand Down Expand Up @@ -281,7 +281,7 @@ public static SegmentAnalysis mergeAnalyses(

// This is a defensive check since SegementMetadata query instantiation guarantees this
if (CollectionUtils.isNullOrEmpty(dataSources)) {
throw InvalidInput.exception("SegementMetadata queries require at least one datasource.");
throw DruidException.defensive("SegementMetadata queries require at least one datasource.");
}

SegmentId mergedSegmentId = null;
Expand Down Expand Up @@ -437,13 +437,34 @@ public static SegmentAnalysis mergeAnalyses(
rollup = null;
}

final Map<String, AggregateProjectionMetadata> projections;
if (arg1.getProjections() != null && arg2.getProjections() != null) {
projections = new HashMap<>();
// Merge two maps of AggregateProjectionMetadata, returning a new map with the same keys and merged metadata.
// If the schemas do not match, the metadata is not merged and the key is not included in the result.
for (String name : Sets.intersection(arg1.getProjections().keySet(), arg2.getProjections().keySet())) {
AggregateProjectionMetadata spec1 = arg1.getProjections().get(name);
AggregateProjectionMetadata spec2 = arg2.getProjections().get(name);
if (spec1.getSchema().equals(spec2.getSchema())) {
// If the schemas are equal, we can merge the metadata
projections.put(
name,
new AggregateProjectionMetadata(spec1.getSchema(), spec1.getNumRows() + spec2.getNumRows())
);
}
}
} else {
projections = null;
}

return new SegmentAnalysis(
mergedId,
newIntervals,
columns,
arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
(projections == null || projections.isEmpty()) ? null : projections,
timestampSpec,
queryGranularity,
rollup
Expand All @@ -460,6 +481,7 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
analysis.getSize(),
analysis.getNumRows(),
analysis.getAggregators(),
analysis.getProjections(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity(),
analysis.isRollup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.query.metadata.metadata.ColumnIncluderator;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.PhysicalSegmentInspector;
import org.apache.druid.segment.Segment;
Expand All @@ -57,10 +58,12 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
Expand Down Expand Up @@ -132,6 +135,20 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, ResponseCon
aggregators = null;
}

final Map<String, AggregateProjectionMetadata> projectionsMap;
if (updatedQuery.hasProjections()
&& ((metadata = Objects.isNull(metadata) ? getMetadata(segment) : metadata)) != null
&& metadata.getProjections() != null) {
projectionsMap = metadata.getProjections()
.stream()
.collect(Collectors.toUnmodifiableMap(
projectionMetadata -> projectionMetadata.getSchema().getName(),
p -> p
));
} else {
projectionsMap = null;
}

final TimestampSpec timestampSpec;
if (updatedQuery.hasTimestampSpec()) {
if (metadata == null) {
Expand Down Expand Up @@ -174,6 +191,7 @@ public Sequence<SegmentAnalysis> run(QueryPlus<SegmentAnalysis> inQ, ResponseCon
totalSize,
numRows,
aggregators,
projectionsMap,
timestampSpec,
queryGranularity,
rollup
Expand Down
Loading