-
Notifications
You must be signed in to change notification settings - Fork 3.8k
SegmentMetadataQuery support for returning aggregators. #2295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,9 +20,11 @@ | |
| package io.druid.query.metadata; | ||
|
|
||
| import com.fasterxml.jackson.core.type.TypeReference; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.base.Function; | ||
| import com.google.common.base.Functions; | ||
| import com.google.common.base.Predicate; | ||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.Iterables; | ||
| import com.google.common.collect.Lists; | ||
| import com.google.common.collect.Maps; | ||
|
|
@@ -41,6 +43,8 @@ | |
| import io.druid.query.QueryRunner; | ||
| import io.druid.query.QueryToolChest; | ||
| import io.druid.query.ResultMergeQueryRunner; | ||
| import io.druid.query.aggregation.AggregatorFactory; | ||
| import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; | ||
| import io.druid.query.aggregation.MetricManipulationFn; | ||
| import io.druid.query.metadata.metadata.ColumnAnalysis; | ||
| import io.druid.query.metadata.metadata.SegmentAnalysis; | ||
|
|
@@ -51,7 +55,7 @@ | |
|
|
||
| import javax.annotation.Nullable; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
@@ -67,13 +71,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn | |
| @Override | ||
| public SegmentAnalysis apply(SegmentAnalysis analysis) | ||
| { | ||
| return new SegmentAnalysis( | ||
| analysis.getId(), | ||
| analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null, | ||
| analysis.getColumns(), | ||
| analysis.getSize(), | ||
| analysis.getNumRows() | ||
| ); | ||
| return finalizeAnalysis(analysis); | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -139,44 +137,7 @@ protected BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMerg | |
| @Override | ||
| public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2) | ||
| { | ||
| if (arg1 == null) { | ||
| return arg2; | ||
| } | ||
|
|
||
| if (arg2 == null) { | ||
| return arg1; | ||
| } | ||
|
|
||
| List<Interval> newIntervals = null; | ||
| if (query.analyzingInterval()) { | ||
| //List returned by arg1.getIntervals() is immutable, so a new list needs to | ||
| //be created. | ||
| newIntervals = new ArrayList<>(arg1.getIntervals()); | ||
| newIntervals.addAll(arg2.getIntervals()); | ||
| } | ||
|
|
||
| final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns(); | ||
| final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns(); | ||
| Map<String, ColumnAnalysis> columns = Maps.newTreeMap(); | ||
|
|
||
| Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet()); | ||
| for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) { | ||
| final String columnName = entry.getKey(); | ||
| columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName))); | ||
| rightColumnNames.remove(columnName); | ||
| } | ||
|
|
||
| for (String columnName : rightColumnNames) { | ||
| columns.put(columnName, rightColumns.get(columnName)); | ||
| } | ||
|
|
||
| return new SegmentAnalysis( | ||
| "merged", | ||
| newIntervals, | ||
| columns, | ||
| arg1.getSize() + arg2.getSize(), | ||
| arg1.getNumRows() + arg2.getNumRows() | ||
| ); | ||
| return mergeAnalyses(arg1, arg2, query.isLenientAggregatorMerge()); | ||
| } | ||
| }; | ||
| } | ||
|
|
@@ -284,4 +245,110 @@ public boolean apply(T input) | |
| ) | ||
| ); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public static SegmentAnalysis mergeAnalyses( | ||
| final SegmentAnalysis arg1, | ||
| final SegmentAnalysis arg2, | ||
| boolean lenientAggregatorMerge | ||
| ) | ||
| { | ||
| if (arg1 == null) { | ||
| return arg2; | ||
| } | ||
|
|
||
| if (arg2 == null) { | ||
| return arg1; | ||
| } | ||
|
|
||
| List<Interval> newIntervals = null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just make this an empty list? easier to read the following code
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, you might rely on newIntervals to be a null for a check later on I guess |
||
| if (arg1.getIntervals() != null) { | ||
| newIntervals = Lists.newArrayList(); | ||
| newIntervals.addAll(arg1.getIntervals()); | ||
| } | ||
| if (arg2.getIntervals() != null) { | ||
| if (newIntervals == null) { | ||
| newIntervals = Lists.newArrayList(); | ||
| } | ||
| newIntervals.addAll(arg2.getIntervals()); | ||
| } | ||
|
|
||
| final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns(); | ||
| final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns(); | ||
| Map<String, ColumnAnalysis> columns = Maps.newTreeMap(); | ||
|
|
||
| Set<String> rightColumnNames = Sets.newHashSet(rightColumns.keySet()); | ||
| for (Map.Entry<String, ColumnAnalysis> entry : leftColumns.entrySet()) { | ||
| final String columnName = entry.getKey(); | ||
| columns.put(columnName, entry.getValue().fold(rightColumns.get(columnName))); | ||
| rightColumnNames.remove(columnName); | ||
| } | ||
|
|
||
| for (String columnName : rightColumnNames) { | ||
| columns.put(columnName, rightColumns.get(columnName)); | ||
| } | ||
|
|
||
| final Map<String, AggregatorFactory> aggregators = Maps.newHashMap(); | ||
|
|
||
| if (lenientAggregatorMerge) { | ||
| // Merge each aggregator individually, ignoring nulls | ||
| for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) { | ||
| if (analysis.getAggregators() != null) { | ||
| for (AggregatorFactory aggregator : analysis.getAggregators().values()) { | ||
| AggregatorFactory merged = aggregators.get(aggregator.getName()); | ||
| if (merged != null) { | ||
| try { | ||
| merged = merged.getMergingFactory(aggregator); | ||
| } | ||
| catch (AggregatorFactoryNotMergeableException e) { | ||
| merged = null; | ||
| } | ||
| } else { | ||
| merged = aggregator; | ||
| } | ||
| aggregators.put(aggregator.getName(), merged); | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| final AggregatorFactory[] aggs1 = arg1.getAggregators() != null | ||
| ? arg1.getAggregators() | ||
| .values() | ||
| .toArray(new AggregatorFactory[arg1.getAggregators().size()]) | ||
| : null; | ||
| final AggregatorFactory[] aggs2 = arg2.getAggregators() != null | ||
| ? arg2.getAggregators() | ||
| .values() | ||
| .toArray(new AggregatorFactory[arg2.getAggregators().size()]) | ||
| : null; | ||
| final AggregatorFactory[] merged = AggregatorFactory.mergeAggregators(Arrays.asList(aggs1, aggs2)); | ||
| if (merged != null) { | ||
| for (AggregatorFactory aggregator : merged) { | ||
| aggregators.put(aggregator.getName(), aggregator); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return new SegmentAnalysis( | ||
| "merged", | ||
| newIntervals, | ||
| columns, | ||
| arg1.getSize() + arg2.getSize(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same question as below |
||
| arg1.getNumRows() + arg2.getNumRows(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it never possible to have any overlap here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Different rows in different segments count as two rows, so there can't be overlap
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm, this is just copied from deleted code |
||
| aggregators.isEmpty() ? null : aggregators | ||
| ); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis) | ||
| { | ||
| return new SegmentAnalysis( | ||
| analysis.getId(), | ||
| analysis.getIntervals() != null ? JodaUtils.condenseIntervals(analysis.getIntervals()) : null, | ||
| analysis.getColumns(), | ||
| analysis.getSize(), | ||
| analysis.getNumRows(), | ||
| analysis.getAggregators() | ||
| ); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,10 +39,12 @@ | |
| import io.druid.query.QueryRunnerFactory; | ||
| import io.druid.query.QueryToolChest; | ||
| import io.druid.query.QueryWatcher; | ||
| import io.druid.query.aggregation.AggregatorFactory; | ||
| import io.druid.query.metadata.metadata.ColumnAnalysis; | ||
| import io.druid.query.metadata.metadata.ColumnIncluderator; | ||
| import io.druid.query.metadata.metadata.SegmentAnalysis; | ||
| import io.druid.query.metadata.metadata.SegmentMetadataQuery; | ||
| import io.druid.segment.Metadata; | ||
| import io.druid.segment.Segment; | ||
| import org.joda.time.Interval; | ||
|
|
||
|
|
@@ -108,14 +110,30 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj | |
| } | ||
| List<Interval> retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null; | ||
|
|
||
| final Map<String, AggregatorFactory> aggregators; | ||
| if (query.hasAggregators()) { | ||
| final Metadata metadata = segment.asStorageAdapter().getMetadata(); | ||
| if (metadata != null && metadata.getAggregators() != null) { | ||
| aggregators = Maps.newHashMap(); | ||
| for (AggregatorFactory aggregator : metadata.getAggregators()) { | ||
| aggregators.put(aggregator.getName(), aggregator); | ||
| } | ||
| } else { | ||
| aggregators = null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't we just set aggregators to null when it is declared?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i sorta prefer |
||
| } | ||
| } else { | ||
| aggregators = null; | ||
| } | ||
|
|
||
| return Sequences.simple( | ||
| Arrays.asList( | ||
| new SegmentAnalysis( | ||
| segment.getIdentifier(), | ||
| retIntervals, | ||
| columns, | ||
| totalSize, | ||
| numRows | ||
| numRows, | ||
| aggregators | ||
| ) | ||
| ) | ||
| ); | ||
|
|
@@ -168,10 +186,10 @@ public Sequence<SegmentAnalysis> call() throws Exception | |
| future.cancel(true); | ||
| throw new QueryInterruptedException("Query interrupted"); | ||
| } | ||
| catch(CancellationException e) { | ||
| catch (CancellationException e) { | ||
| throw new QueryInterruptedException("Query cancelled"); | ||
| } | ||
| catch(TimeoutException e) { | ||
| catch (TimeoutException e) { | ||
| log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); | ||
| future.cancel(true); | ||
| throw new QueryInterruptedException("Query timeout"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have no idea what this means.
Well I know what it means, but anyone not super familiar with Druid reading this would not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it kind of means nothing and we should probably remove it in the future (I think nobody depends on it)