diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java index 42017bcced1e..f08d618ff6c3 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DruidSchemaInternRowSignatureBenchmark.java @@ -143,6 +143,7 @@ public Sequence runSegmentMetadataQuery(Iterable seg null, null, null, + null, false ) ) diff --git a/docs/querying/segmentmetadataquery.md b/docs/querying/segmentmetadataquery.md index beae6ad8a4c7..de223b9fed2c 100644 --- a/docs/querying/segmentmetadataquery.md +++ b/docs/querying/segmentmetadataquery.md @@ -197,7 +197,12 @@ null if the aggregators are unknown or unmergeable (if merging is enabled). * `rollup` in the result is true/false/null. * When merging is enabled, if some are rollup, others are not, result is null. -### aggregatorMergeStrategy +### projections + +* `projections` in the result will contain the list of projections in segments. +* if any conflicting projections are identified, the conflicting one will be excluded, while the non-conflicting ones will be included. + +## aggregatorMergeStrategy Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if two segments use incompatible aggregators for the same column, such as `longSum` changed to `doubleSum`. @@ -213,6 +218,6 @@ Druid supports the following aggregator merge strategies: for that particular column. -### lenientAggregatorMerge (deprecated) +## lenientAggregatorMerge (deprecated) Deprecated. Use [`aggregatorMergeStrategy`](#aggregatormergestrategy) instead. diff --git a/integration-tests/src/test/resources/queries/twitterstream_queries.json b/integration-tests/src/test/resources/queries/twitterstream_queries.json index acbeb3e60b89..c83d616d789d 100644 --- a/integration-tests/src/test/resources/queries/twitterstream_queries.json +++ b/integration-tests/src/test/resources/queries/twitterstream_queries.json @@ -603,6 +603,7 @@ "size":0, "numRows":3702583, "aggregators":null, + "projections":null, "timestampSpec":null, "queryGranularity":null, "rollup":null @@ -626,6 +627,7 @@ "size":0, "numRows":3743002, "aggregators":null, + "projections":null, "timestampSpec":null, "queryGranularity":null, "rollup":null @@ -649,6 +651,7 @@ "size":0, "numRows":3502959, "aggregators":null, + "projections":null, "timestampSpec":null, "queryGranularity":null, "rollup":null diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index 4cb4c0ec4857..54901737a913 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -1433,6 +1433,7 @@ "size":0, "numRows":4462111, "aggregators":null, + "projections":null, "timestampSpec":null, "queryGranularity":null, "rollup":null diff --git a/processing/pom.xml b/processing/pom.xml index 6c5b76ca1d72..8d73e536c76d 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -453,6 +453,11 @@ system-rules test + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java index 3d34500b7109..b46330ebd3d8 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java @@ -443,20 +443,20 @@ public static AggregatorFactory[] mergeAggregators(List agg mergedAggregators.put(name, aggregator.getMergingFactory(other)); } catch (AggregatorFactoryNotMergeableException ex) { + // Aggregator with the same name can't be merged, log it and return null early log.warn(ex, "failed to merge aggregator factories"); - mergedAggregators = null; - break; + return null; } } else { mergedAggregators.put(name, aggregator); } } } else { - mergedAggregators = null; - break; + // one of the segments being merged has unknown aggregators, return null early + return null; } } - return mergedAggregators == null ? null : mergedAggregators.values().toArray(new AggregatorFactory[0]); + return mergedAggregators.values().toArray(new AggregatorFactory[0]); } } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 23e583214c84..4ad82123055b 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -33,7 +33,6 @@ import org.apache.druid.common.guava.CombiningSequence; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; -import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Comparators; @@ -57,6 +56,7 @@ import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.segment.AggregateProjectionMetadata; import org.apache.druid.timeline.LogicalSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.utils.CollectionUtils; @@ -281,7 +281,7 @@ public static SegmentAnalysis mergeAnalyses( // This is a defensive check since SegementMetadata query instantiation guarantees this if (CollectionUtils.isNullOrEmpty(dataSources)) { - throw InvalidInput.exception("SegementMetadata queries require at least one datasource."); + throw DruidException.defensive("SegementMetadata queries require at least one datasource."); } SegmentId mergedSegmentId = null; @@ -437,6 +437,26 @@ public static SegmentAnalysis mergeAnalyses( rollup = null; } + final Map projections; + if (arg1.getProjections() != null && arg2.getProjections() != null) { + projections = new HashMap<>(); + // Merge two maps of AggregateProjectionMetadata, returning a new map with the same keys and merged metadata. + // If the schemas do not match, the metadata is not merged and the key is not included in the result. + for (String name : Sets.intersection(arg1.getProjections().keySet(), arg2.getProjections().keySet())) { + AggregateProjectionMetadata spec1 = arg1.getProjections().get(name); + AggregateProjectionMetadata spec2 = arg2.getProjections().get(name); + if (spec1.getSchema().equals(spec2.getSchema())) { + // If the schemas are equal, we can merge the metadata + projections.put( + name, + new AggregateProjectionMetadata(spec1.getSchema(), spec1.getNumRows() + spec2.getNumRows()) + ); + } + } + } else { + projections = null; + } + return new SegmentAnalysis( mergedId, newIntervals, @@ -444,6 +464,7 @@ public static SegmentAnalysis mergeAnalyses( arg1.getSize() + arg2.getSize(), arg1.getNumRows() + arg2.getNumRows(), aggregators.isEmpty() ? null : aggregators, + (projections == null || projections.isEmpty()) ? null : projections, timestampSpec, queryGranularity, rollup @@ -460,6 +481,7 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis) analysis.getSize(), analysis.getNumRows(), analysis.getAggregators(), + analysis.getProjections(), analysis.getTimestampSpec(), analysis.getQueryGranularity(), analysis.isRollup() diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 664b5d32bdce..6320df7d06c0 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -46,6 +46,7 @@ import org.apache.druid.query.metadata.metadata.ColumnIncluderator; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; +import org.apache.druid.segment.AggregateProjectionMetadata; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.PhysicalSegmentInspector; import org.apache.druid.segment.Segment; @@ -57,10 +58,12 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory { @@ -132,6 +135,20 @@ public Sequence run(QueryPlus inQ, ResponseCon aggregators = null; } + final Map projectionsMap; + if (updatedQuery.hasProjections() + && ((metadata = Objects.isNull(metadata) ? getMetadata(segment) : metadata)) != null + && metadata.getProjections() != null) { + projectionsMap = metadata.getProjections() + .stream() + .collect(Collectors.toUnmodifiableMap( + projectionMetadata -> projectionMetadata.getSchema().getName(), + p -> p + )); + } else { + projectionsMap = null; + } + final TimestampSpec timestampSpec; if (updatedQuery.hasTimestampSpec()) { if (metadata == null) { @@ -174,6 +191,7 @@ public Sequence run(QueryPlus inQ, ResponseCon totalSize, numRows, aggregators, + projectionsMap, timestampSpec, queryGranularity, rollup diff --git a/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentAnalysis.java b/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentAnalysis.java index 13576a6a11f0..8ba23be919e0 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentAnalysis.java @@ -24,12 +24,16 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.AggregateProjectionMetadata; +import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; public class SegmentAnalysis implements Comparable { @@ -52,6 +56,7 @@ public class SegmentAnalysis implements Comparable private final long size; private final long numRows; private final Map aggregators; + private final Map projections; private final TimestampSpec timestampSpec; private final Granularity queryGranularity; private final Boolean rollup; @@ -64,6 +69,7 @@ public SegmentAnalysis( @JsonProperty("size") long size, @JsonProperty("numRows") long numRows, @JsonProperty("aggregators") Map aggregators, + @JsonProperty("projections") Map projections, @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("queryGranularity") Granularity queryGranularity, @JsonProperty("rollup") Boolean rollup @@ -75,6 +81,7 @@ public SegmentAnalysis( this.size = size; this.numRows = numRows; this.aggregators = aggregators; + this.projections = projections; this.timestampSpec = timestampSpec; this.queryGranularity = queryGranularity; this.rollup = rollup; @@ -134,6 +141,12 @@ public Map getAggregators() return aggregators; } + @JsonProperty + public Map getProjections() + { + return projections; + } + @Override public String toString() { @@ -144,6 +157,7 @@ public String toString() ", size=" + size + ", numRows=" + numRows + ", aggregators=" + aggregators + + ", projections=" + projections + ", timestampSpec=" + timestampSpec + ", queryGranularity=" + queryGranularity + ", rollup=" + rollup + @@ -170,6 +184,7 @@ public boolean equals(Object o) Objects.equals(interval, that.interval) && Objects.equals(columns, that.columns) && Objects.equals(aggregators, that.aggregators) && + Objects.equals(projections, that.projections) && Objects.equals(timestampSpec, that.timestampSpec) && Objects.equals(queryGranularity, that.queryGranularity); } @@ -181,7 +196,18 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity, rollup); + return Objects.hash( + id, + interval, + columns, + size, + numRows, + aggregators, + projections, + timestampSpec, + queryGranularity, + rollup + ); } @Override @@ -189,4 +215,103 @@ public int compareTo(SegmentAnalysis rhs) { return id.compareTo(rhs.getId()); } + + /** + * Helper class to build {@link SegmentAnalysis} objects. + */ + public static class Builder + { + private final String segmentId; + private final LinkedHashMap columns = new LinkedHashMap<>(); + private final Map aggregators = new LinkedHashMap<>(); + private final Map projections = new LinkedHashMap<>(); + + private List intervals = null; + private Optional size = Optional.empty(); + private Optional numRows = Optional.empty(); + private Optional rollup = Optional.empty(); + + public Builder(String segmentId) + { + this.segmentId = segmentId; + } + + public Builder(SegmentId segmentId) + { + this.segmentId = segmentId.toString(); + } + + public Builder size(int size) + { + if (this.size.isEmpty()) { + this.size = Optional.of(size); + } else { + throw new IllegalStateException("Size is already set: " + this.size.get()); + } + return this; + } + + public Builder numRows(int numRows) + { + if (this.numRows.isEmpty()) { + this.numRows = Optional.of(numRows); + } else { + throw new IllegalStateException("NumRows is already set: " + this.numRows.get()); + } + return this; + } + + public Builder rollup(boolean rollup) + { + if (this.rollup.isEmpty()) { + this.rollup = Optional.of(rollup); + } else { + throw new IllegalStateException("Rollup is already set: " + this.rollup.get()); + } + return this; + } + + public Builder interval(Interval interval) + { + if (this.intervals == null) { + this.intervals = new ArrayList<>(); + } + this.intervals.add(interval); + return this; + } + + public Builder column(String columnName, ColumnAnalysis columnAnalysis) + { + this.columns.put(columnName, columnAnalysis); + return this; + } + + public Builder aggregator(String name, AggregatorFactory aggregatorFactory) + { + this.aggregators.put(name, aggregatorFactory); + return this; + } + + public Builder projection(String name, AggregateProjectionMetadata projection) + { + this.projections.put(name, projection); + return this; + } + + public SegmentAnalysis build() + { + return new SegmentAnalysis( + segmentId, + intervals, + columns, + size.orElse(0), + numRows.orElse(0), + aggregators.isEmpty() ? null : aggregators, + projections.isEmpty() ? null : projections, + null, + null, + rollup.orElse(null) + ); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentMetadataQuery.java index 5458fc65de73..bf407e0cc9ac 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -59,7 +59,8 @@ public enum AnalysisType implements Cacheable MINMAX, TIMESTAMPSPEC, QUERYGRANULARITY, - ROLLUP; + ROLLUP, + PROJECTIONS; @JsonValue @Override @@ -192,6 +193,11 @@ public boolean hasAggregators() return analysisTypes.contains(AnalysisType.AGGREGATORS); } + public boolean hasProjections() + { + return analysisTypes.contains(AnalysisType.PROJECTIONS); + } + public boolean hasTimestampSpec() { return analysisTypes.contains(AnalysisType.TIMESTAMPSPEC); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 4afd7afdc0e3..d337dd0d0aa1 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -130,7 +130,7 @@ public IndexMergerV9( private File makeIndexFiles( final List adapters, - final @Nullable AggregatorFactory[] metricAggs, + final @Nullable Metadata segmentMetadata, final File outDir, final ProgressIndicator progress, final List mergedDimensionsWithTime, // has both explicit and implicit dimensions, as well as __time @@ -144,43 +144,11 @@ private File makeIndexFiles( progress.start(); progress.progress(); - List metadataList = Lists.transform(adapters, IndexableAdapter::getMetadata); - // Merged dimensions without __time. List mergedDimensions = mergedDimensionsWithTime.stream() .filter(dim -> !ColumnHolder.TIME_COLUMN_NAME.equals(dim)) .collect(Collectors.toList()); - - Metadata segmentMetadata; - if (metricAggs != null) { - AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[metricAggs.length]; - for (int i = 0; i < metricAggs.length; i++) { - combiningMetricAggs[i] = metricAggs[i].getCombiningFactory(); - } - segmentMetadata = Metadata.merge( - metadataList, - combiningMetricAggs - ); - } else { - segmentMetadata = Metadata.merge( - metadataList, - null - ); - } - - if (segmentMetadata != null - && segmentMetadata.getOrdering() != null - && segmentMetadata.getOrdering() - .stream() - .noneMatch(orderBy -> ColumnHolder.TIME_COLUMN_NAME.equals(orderBy.getColumnName()))) { - throw DruidException.defensive( - "sortOrder[%s] must include[%s]", - segmentMetadata.getOrdering(), - ColumnHolder.TIME_COLUMN_NAME - ); - } - Closer closer = Closer.create(); try { final FileSmoosher v9Smoosher = new FileSmoosher(outDir); @@ -302,8 +270,12 @@ private File makeIndexFiles( progress.stopSection(section); - if (segmentMetadata != null && !CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections())) { - segmentMetadata = makeProjections( + // Recompute the projections. + final Metadata finalMetadata; + if (segmentMetadata == null || CollectionUtils.isNullOrEmpty(segmentMetadata.getProjections())) { + finalMetadata = segmentMetadata; + } else { + finalMetadata = makeProjections( v9Smoosher, segmentMetadata.getProjections(), adapters, @@ -330,7 +302,7 @@ private File makeIndexFiles( mergers, dimensionsSpecInspector ); - makeMetadataBinary(v9Smoosher, progress, segmentMetadata); + makeMetadataBinary(v9Smoosher, progress, finalMetadata); v9Smoosher.close(); progress.stop(); @@ -677,6 +649,7 @@ private void makeMetricsColumns( { makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsTypes, metWriters, indexSpec, ""); } + private void makeMetricsColumns( final FileSmoosher v9Smoosher, final ProgressIndicator progress, @@ -1145,6 +1118,9 @@ public File mergeQueryableIndex( ); } + /** + * The indexes here must have the same {@link Metadata}, otherwise an error would be thrown. + */ @Override public File merge( List indexes, @@ -1377,9 +1353,28 @@ private File merge( rowMergerFn = MergingRowIterator::new; } + List metadataList = Lists.transform(indexes, IndexableAdapter::getMetadata); + AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[sortedMetricAggs.length]; + for (int i = 0; i < sortedMetricAggs.length; i++) { + combiningMetricAggs[i] = sortedMetricAggs[i].getCombiningFactory(); + } + final Metadata segmentMetadata = Metadata.merge(metadataList, combiningMetricAggs); + + if (segmentMetadata != null + && segmentMetadata.getOrdering() != null + && segmentMetadata.getOrdering() + .stream() + .noneMatch(orderBy -> ColumnHolder.TIME_COLUMN_NAME.equals(orderBy.getColumnName()))) { + throw DruidException.defensive( + "sortOrder[%s] must include[%s]", + segmentMetadata.getOrdering(), + ColumnHolder.TIME_COLUMN_NAME + ); + } + return makeIndexFiles( indexes, - sortedMetricAggs, + segmentMetadata, outDir, progress, mergedDimensionsWithTime, diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java index 03b5fe7e45e3..678c5c2d3669 100644 --- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java +++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java @@ -197,6 +197,7 @@ public static Collection dataFeeder() null, null, null, + null, null ); @@ -248,6 +249,7 @@ public static Collection dataFeeder() null, null, null, + null, null ); diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java index 4f68c9e059dd..56800c4c3c41 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalysisTest.java @@ -22,13 +22,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; +import org.apache.druid.segment.AggregateProjectionMetadata; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; import org.junit.Test; @@ -37,6 +43,15 @@ public class SegmentAnalysisTest { + + @Test + public void testEquals() + { + EqualsVerifier.forClass(SegmentAnalysis.class) + .usingGetClass() + .verify(); + } + @Test public void testSerde() throws Exception { @@ -67,6 +82,27 @@ public void testSerde() throws Exception 1, 2, ImmutableMap.of("cnt", new CountAggregatorFactory("cnt")), + ImmutableMap.of("channel_added_hourly", new AggregateProjectionMetadata( + new AggregateProjectionMetadata.Schema( + "channel_added_hourly", + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, + VirtualColumns.create( + Granularities.toVirtualColumn( + Granularities.HOUR, + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME + ) + ), + ImmutableList.of(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel"), + new AggregatorFactory[] { + new LongSumAggregatorFactory("sum_added", "added") + }, + ImmutableList.of( + OrderBy.ascending(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME), + OrderBy.ascending("channel") + ) + ), + 16 + )), new TimestampSpec(null, null, null), Granularities.SECOND, true diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index 4afb202b4331..69be9165ac9d 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -22,15 +22,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; +import org.apache.druid.query.OrderBy; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -43,6 +44,8 @@ import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.spec.LegacySegmentSpec; +import org.apache.druid.segment.AggregateProjectionMetadata; +import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ValueType; import org.apache.druid.timeline.LogicalSegment; @@ -51,29 +54,53 @@ import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; public class SegmentMetadataQueryQueryToolChestTest { private static final DataSource TEST_DATASOURCE = new TableDataSource("dummy"); - private static final SegmentId TEST_SEGMENT_ID1 = SegmentId.of( - TEST_DATASOURCE.toString(), - Intervals.of("2020-01-01/2020-01-02"), - "test", - 0 + private static final Interval INTERVAL_2020 = Intervals.of("2020-01-01/2020-01-02"); + private static final Interval INTERVAL_2021 = Intervals.of("2021-01-01/2021-01-02"); + private static final SegmentId TEST_SEGMENT_ID1 = SegmentId.of(TEST_DATASOURCE.toString(), INTERVAL_2020, "test", 0); + private static final SegmentId TEST_SEGMENT_ID2 = SegmentId.of(TEST_DATASOURCE.toString(), INTERVAL_2021, "test", 0); + + private static final AggregateProjectionMetadata.Schema PROJECTION_CHANNEL_ADDED_HOURLY = new AggregateProjectionMetadata.Schema( + "name1-does-not-matter", + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, + VirtualColumns.create(Granularities.toVirtualColumn( + Granularities.HOUR, + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME + )), + ImmutableList.of(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel"), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("channel_sum", "channel") + }, + ImmutableList.of( + OrderBy.ascending(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME), + OrderBy.ascending("channel") + ) ); - private static final SegmentId TEST_SEGMENT_ID2 = SegmentId.of( - TEST_DATASOURCE.toString(), - Intervals.of("2021-01-01/2021-01-02"), - "test", - 0 + private static final AggregateProjectionMetadata.Schema PROJECTION_CHANNEL_ADDED_DAILY = new AggregateProjectionMetadata.Schema( + "name2-does-not-matter", + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, + VirtualColumns.create(Granularities.toVirtualColumn( + Granularities.DAY, + Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME + )), + ImmutableList.of(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME, "channel"), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("channel_sum", "channel") + }, + ImmutableList.of( + OrderBy.ascending(Granularities.GRANULARITY_VIRTUAL_COLUMN_NAME), + OrderBy.ascending("channel") + ) ); @Test @@ -99,32 +126,25 @@ public void testCacheStrategy() throws Exception byte[] actualKey = strategy.computeCacheKey(query); Assert.assertArrayEquals(expectedKey, actualKey); - SegmentAnalysis result = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - ImmutableList.of(Intervals.of("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")), - new LinkedHashMap<>( - ImmutableMap.of( - "placement", - new ColumnAnalysis( - ColumnType.STRING, - ValueType.STRING.name(), - true, - false, - 10881, - 1, - "preferred", - "preferred", - null - ) + SegmentAnalysis result = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .interval(Intervals.of("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")) + .column( + "placement", + new ColumnAnalysis( + ColumnType.STRING, + ValueType.STRING.name(), + true, + false, + 10881, + 1, + "preferred", + "preferred", + null ) - ), - 71982, - 100, - null, - null, - null, - null - ); + ) + .size(71982) + .numRows(100) + .build(); Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result); @@ -139,980 +159,302 @@ public void testCacheStrategy() throws Exception Assert.assertEquals(result, fromCacheResult); } - @Test - public void testMergeAggregators() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeAggregators(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeStrict(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLenient(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest(analysis1, analysis2) - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("baz", new DoubleSumAggregatorFactory("baz", "baz")) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + + final SegmentAnalysis expected = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .aggregator("baz", new DoubleSumAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expected, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); } - @Test - public void testMergeAggregatorsWithIntervals() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeAggregatorsWithIntervals(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - ImmutableList.of(TEST_SEGMENT_ID1.getInterval()), - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - ImmutableList.of(TEST_SEGMENT_ID2.getInterval()), - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); - - final List expectedIntervals = new ArrayList<>(); - expectedIntervals.addAll(analysis1.getIntervals()); - expectedIntervals.addAll(analysis2.getIntervals()); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - expectedIntervals, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeStrict(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - expectedIntervals, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLenient(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - expectedIntervals, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "baz", new DoubleSumAggregatorFactory("baz", "baz"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - expectedIntervals, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new DoubleSumAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest(analysis1, analysis2) - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .interval(TEST_SEGMENT_ID1.getInterval()) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("baz", new DoubleSumAggregatorFactory("baz", "baz")) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .interval(TEST_SEGMENT_ID2.getInterval()) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + + SegmentAnalysis expected = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .interval(TEST_SEGMENT_ID1.getInterval()) + .interval(TEST_SEGMENT_ID2.getInterval()) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .aggregator("baz", new DoubleSumAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expected, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); } - @Test - public void testMergeAggregatorsOneNull() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeAggregatorsOneNullStrict(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); + Assume.assumeTrue(aggregatorMergeStrategy == AggregatorMergeStrategy.STRICT); - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), - mergeStrict(analysis1, analysis2) - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ), - mergeLenient(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ), - mergeLatest(analysis1, analysis2) - ); + final SegmentAnalysis expectedNullAggregators = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged").build(); + Assert.assertEquals(expectedNullAggregators, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); } - @Test - public void testMergeAggregatorsAllNull() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeAggregatorsOneNullNotStrict(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), - mergeStrict(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), - mergeLenient(analysis1, analysis2) - ); + Assume.assumeTrue(aggregatorMergeStrategy != AggregatorMergeStrategy.STRICT); + + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + + final SegmentAnalysis expected = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + Assert.assertEquals(expected, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); + } - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) - ); + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeAggregatorsAllNull(AggregatorMergeStrategy aggregatorMergeStrategy) + { + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2).build(); - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), - mergeLatest(analysis1, analysis2) - ); + final SegmentAnalysis expected = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged").build(); + Assert.assertEquals(expected, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); } @Test public void testMergeAggregatorsConflict() { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ); - - final Map expectedLenient = new HashMap<>(); - expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); - expectedLenient.put("bar", null); - expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), - mergeStrict(analysis1, analysis2) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), - mergeLenient(analysis1, analysis2) - ); - + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleMaxAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + + // Test strict merge, returns null aggregators as there's a conflict on "bar" + final SegmentAnalysis expectedStrict = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged").build(); + Assert.assertEquals(expectedStrict, mergeWithStrategy(analysis1, analysis2, AggregatorMergeStrategy.STRICT)); + + // Test lenient merge, returns a map with null for "bar" as it has a conflict + final SegmentAnalysis expectedLenient = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", null) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2)); // Simulate multi-level lenient merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), - mergeLenient( - mergeLenient(analysis1, analysis2), - mergeLenient(analysis1, analysis2) - ) + expectedLenient, + mergeLenient(mergeLenient(analysis1, analysis2), mergeLenient(analysis1, analysis2)) ); - // Simulate multi-level lenient merge (unmerged first) - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), - mergeLenient( - analysis1, - mergeLenient(analysis1, analysis2) - ) - ); - + Assert.assertEquals(expectedLenient, mergeLenient(analysis1, mergeLenient(analysis1, analysis2))); // Simulate multi-level lenient merge (unmerged second) - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), - mergeLenient( - mergeLenient(analysis1, analysis2), - analysis1 - ) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) - ); - + Assert.assertEquals(expectedLenient, mergeLenient(mergeLenient(analysis1, analysis2), analysis1)); + + // Test earliest merge, returns a map with "bar" as DoubleSumAggregatorFactory since analysis1 is earlier + final SegmentAnalysis expectedEarliest = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedEarliest, mergeEarliest(analysis1, analysis2)); // Simulate multi-level earliest merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest( - mergeEarliest(analysis1, analysis2), - mergeEarliest(analysis1, analysis2) - ) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest(analysis1, analysis2) + expectedEarliest, + mergeEarliest(mergeEarliest(analysis1, analysis2), mergeEarliest(analysis1, analysis2)) ); + // Test latest merge, returns a map with "bar" as DoubleMaxAggregatorFactory since analysis2 is later + final SegmentAnalysis expectedLatest = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleMaxAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedLatest, mergeLatest(analysis1, analysis2)); // Simulate multi-level latest merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest( - mergeLatest(analysis1, analysis2), - mergeLatest(analysis1, analysis2) - ) + expectedLatest, + mergeLatest(mergeLatest(analysis1, analysis2), mergeLatest(analysis1, analysis2)) ); } @Test public void testMergeAggregatorsConflictWithDifferentOrder() { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); - - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ); - - final Map expectedLenient = new HashMap<>(); - expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); - expectedLenient.put("bar", null); - expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleMaxAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + + // Test strict merge, returns null aggregators as there's a conflict on "bar" + Assert.assertEquals( + new SegmentAnalysis.Builder("dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .build(), mergeStrict(analysis1, analysis2) ); + // Test lenient merge, returns a map with null for "bar" as it has a conflict + final SegmentAnalysis expectedLenient = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", null) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), + expectedLenient, mergeLenient(analysis1, analysis2) ); - // Simulate multi-level lenient merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), + expectedLenient, mergeLenient( mergeLenient(analysis1, analysis2), mergeLenient(analysis1, analysis2) ) ); - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) - ); - + // Test earliest merge, returns a map with "bar" as DoubleMaxAggregatorFactory since analysis2 is earlier + final SegmentAnalysis expectedEarliest = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleMaxAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedEarliest, mergeEarliest(analysis1, analysis2)); // Simulate multi-level earliest merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest( - mergeEarliest(analysis1, analysis2), - mergeEarliest(analysis1, analysis2) - ) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest(analysis1, analysis2) + expectedEarliest, + mergeEarliest(mergeEarliest(analysis1, analysis2), mergeEarliest(analysis1, analysis2)) ); + // Test latest merge, returns a map with "bar" as DoubleSumAggregatorFactory since analysis1 is later + final SegmentAnalysis expectedLatest = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedLatest, mergeLatest(analysis1, analysis2)); // Simulate multi-level latest merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest( - mergeLatest(analysis1, analysis2), - mergeLatest(analysis1, analysis2) - ) + expectedLatest, + mergeLatest(mergeLatest(analysis1, analysis2), mergeLatest(analysis1, analysis2)) ); } @Test public void testMergeAggregatorsConflictWithEqualSegmentIntervalsAndDifferentPartitions() { - final SegmentId segmentId1 = SegmentId.of( - TEST_DATASOURCE.toString(), - Intervals.of("2023-01-01/2023-01-02"), - "test", - 1 - ); - final SegmentId segmentId2 = SegmentId.of( - TEST_DATASOURCE.toString(), - Intervals.of("2023-01-01/2023-01-02"), - "test", - 2 - ); + Interval interval = Intervals.of("2023-01-01/2023-01-02"); + final SegmentId segmentId1 = SegmentId.of(TEST_DATASOURCE.toString(), interval, "test", 1); + final SegmentId segmentId2 = SegmentId.of(TEST_DATASOURCE.toString(), interval, "test", 2); - final SegmentAnalysis analysis1 = new SegmentAnalysis( - segmentId1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); - - final SegmentAnalysis analysis2 = new SegmentAnalysis( - segmentId2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(segmentId1) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); - final Map expectedLenient = new HashMap<>(); - expectedLenient.put("foo", new LongSumAggregatorFactory("foo", "foo")); - expectedLenient.put("bar", null); - expectedLenient.put("baz", new LongMaxAggregatorFactory("baz", "baz")); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(segmentId2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleMaxAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + // Test strict merge, returns null aggregators as there's a conflict on "bar" Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ), + new SegmentAnalysis.Builder("dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2") + .build(), mergeStrict(analysis1, analysis2) ); - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), - mergeLenient(analysis1, analysis2) - ); - + // Test lenient merge, returns a map with null for "bar" as it has a conflict + SegmentAnalysis expectedLenient = new SegmentAnalysis.Builder( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", null) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedLenient, mergeLenient(analysis1, analysis2)); // Simulate multi-level lenient merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - expectedLenient, - null, - null, - null - ), - mergeLenient( - mergeLenient(analysis1, analysis2), - mergeLenient(analysis1, analysis2) - ) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest(analysis1, analysis2) + expectedLenient, + mergeLenient(mergeLenient(analysis1, analysis2), mergeLenient(analysis1, analysis2)) ); + // Test earliest merge, returns a map with "bar" as DoubleSumAggregatorFactory since analysis1 has the earlier partition + SegmentAnalysis expectedEarliest = new SegmentAnalysis.Builder( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedEarliest, mergeEarliest(analysis1, analysis2)); // Simulate multi-level earliest merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeEarliest( - mergeEarliest(analysis1, analysis2), - mergeEarliest(analysis1, analysis2) - ) - ); - - Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest(analysis1, analysis2) + expectedEarliest, + mergeEarliest(mergeEarliest(analysis1, analysis2), mergeEarliest(analysis1, analysis2)) ); + // Test latest merge, returns a map with "bar" as DoubleMaxAggregatorFactory since analysis2 has the later partition + SegmentAnalysis expectedLatest = new SegmentAnalysis.Builder( + "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleMaxAggregatorFactory("bar", "bar")) + .aggregator("baz", new LongMaxAggregatorFactory("baz", "baz")) + .build(); + Assert.assertEquals(expectedLatest, mergeLatest(analysis1, analysis2)); // Simulate multi-level latest merge Assert.assertEquals( - new SegmentAnalysis( - "dummy_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_merged_2", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ), - mergeLatest( - mergeLatest(analysis1, analysis2), - mergeLatest(analysis1, analysis2) - ) + expectedLatest, + mergeLatest(mergeLatest(analysis1, analysis2), mergeLatest(analysis1, analysis2)) ); } @@ -1161,115 +503,29 @@ public Interval getTrueInterval() } @SuppressWarnings("ArgumentParameterSwap") - @Test - public void testMergeRollup() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeRollup(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - false - ); - final SegmentAnalysis analysis3 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - false - ); - final SegmentAnalysis analysis4 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - true - ); - final SegmentAnalysis analysis5 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - true - ); - - Assert.assertNull(mergeStrict(analysis1, analysis2).isRollup()); - Assert.assertNull(mergeStrict(analysis1, analysis4).isRollup()); - Assert.assertNull(mergeStrict(analysis2, analysis4).isRollup()); - Assert.assertFalse(mergeStrict(analysis2, analysis3).isRollup()); - Assert.assertTrue(mergeStrict(analysis4, analysis5).isRollup()); - - Assert.assertNull(mergeLenient(analysis1, analysis2).isRollup()); - Assert.assertNull(mergeLenient(analysis1, analysis4).isRollup()); - Assert.assertNull(mergeLenient(analysis2, analysis4).isRollup()); - Assert.assertFalse(mergeLenient(analysis2, analysis3).isRollup()); - Assert.assertTrue(mergeLenient(analysis4, analysis5).isRollup()); - - Assert.assertNull(mergeEarliest(analysis1, analysis2).isRollup()); - Assert.assertNull(mergeEarliest(analysis1, analysis4).isRollup()); - Assert.assertNull(mergeEarliest(analysis2, analysis4).isRollup()); - Assert.assertFalse(mergeEarliest(analysis2, analysis3).isRollup()); - Assert.assertTrue(mergeEarliest(analysis4, analysis5).isRollup()); - - Assert.assertNull(mergeLatest(analysis1, analysis2).isRollup()); - Assert.assertNull(mergeLatest(analysis1, analysis4).isRollup()); - Assert.assertNull(mergeLatest(analysis2, analysis4).isRollup()); - Assert.assertFalse(mergeLatest(analysis2, analysis3).isRollup()); - Assert.assertTrue(mergeLatest(analysis4, analysis5).isRollup()); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2).rollup(false).build(); + final SegmentAnalysis analysis3 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).rollup(false).build(); + final SegmentAnalysis analysis4 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2).rollup(true).build(); + final SegmentAnalysis analysis5 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).rollup(true).build(); + + Assert.assertNull(mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy).isRollup()); + Assert.assertNull(mergeWithStrategy(analysis1, analysis4, aggregatorMergeStrategy).isRollup()); + Assert.assertNull(mergeWithStrategy(analysis2, analysis4, aggregatorMergeStrategy).isRollup()); + Assert.assertFalse(mergeWithStrategy(analysis2, analysis3, aggregatorMergeStrategy).isRollup()); + Assert.assertTrue(mergeWithStrategy(analysis4, analysis5, aggregatorMergeStrategy).isRollup()); } - @Test - public void testInvalidMergeAggregatorsWithNullOrEmptyDatasource() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testInvalidMergeAggregatorsWithNullOrEmptyDatasource(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - false - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2).build(); MatcherAssert.assertThat( Assert.assertThrows( @@ -1278,13 +534,10 @@ public void testInvalidMergeAggregatorsWithNullOrEmptyDatasource() null, analysis1, analysis2, - AggregatorMergeStrategy.STRICT + aggregatorMergeStrategy ) ), - DruidExceptionMatcher - .invalidInput() - .expectMessageIs( - "SegementMetadata queries require at least one datasource.") + DruidExceptionMatcher.defensive().expectMessageIs("SegementMetadata queries require at least one datasource.") ); MatcherAssert.assertThat( @@ -1294,201 +547,175 @@ public void testInvalidMergeAggregatorsWithNullOrEmptyDatasource() ImmutableSet.of(), analysis1, analysis2, - AggregatorMergeStrategy.STRICT + aggregatorMergeStrategy ) ), DruidExceptionMatcher - .invalidInput() + .defensive() .expectMessageIs( "SegementMetadata queries require at least one datasource.") ); } - - @Test - public void testMergeWithUnionDatasource() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeWithUnionDatasource(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleSumAggregatorFactory("bar", "bar") - ), - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - false - ); - - final SegmentAnalysis expectedMergedAnalysis = new SegmentAnalysis( - "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged", - null, - new LinkedHashMap<>(), - 0, - 0, - ImmutableMap.of( - "foo", new LongSumAggregatorFactory("foo", "foo"), - "bar", new DoubleMaxAggregatorFactory("bar", "bar"), - "baz", new LongMaxAggregatorFactory("baz", "baz") - ), - null, - null, - null - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + + final SegmentAnalysis expectedMergedAnalysis = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .aggregator("foo", new LongSumAggregatorFactory("foo", "foo")) + .aggregator("bar", new DoubleSumAggregatorFactory("bar", "bar")) + .build(); + + UnionDataSource dataSource1 = new UnionDataSource(ImmutableList.of( + new TableDataSource("foo"), + new TableDataSource("dummy") + )); + UnionDataSource dataSource2 = new UnionDataSource(ImmutableList.of( + new TableDataSource("dummy"), + new TableDataSource("foo"), + new TableDataSource("bar") + )); Assert.assertEquals( expectedMergedAnalysis, - SegmentMetadataQueryQueryToolChest.finalizeAnalysis( - SegmentMetadataQueryQueryToolChest.mergeAnalyses( - new UnionDataSource( - ImmutableList.of( - new TableDataSource("foo"), - new TableDataSource("dummy") - ) - ).getTableNames(), - analysis1, - analysis2, - AggregatorMergeStrategy.LATEST - ) - ) + SegmentMetadataQueryQueryToolChest.finalizeAnalysis(SegmentMetadataQueryQueryToolChest.mergeAnalyses( + dataSource1.getTableNames(), + analysis1, + analysis2, + aggregatorMergeStrategy + )) ); - Assert.assertEquals( expectedMergedAnalysis, SegmentMetadataQueryQueryToolChest.finalizeAnalysis( SegmentMetadataQueryQueryToolChest.mergeAnalyses( - new UnionDataSource( - ImmutableList.of( - new TableDataSource("dummy"), - new TableDataSource("foo"), - new TableDataSource("bar") - ) - ).getTableNames(), + dataSource2.getTableNames(), analysis1, analysis2, - AggregatorMergeStrategy.LATEST + aggregatorMergeStrategy ) ) ); } - @Test - public void testMergeWithNullAnalyses() + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testMergeWithNullAnalyses(AggregatorMergeStrategy aggregatorMergeStrategy) { - final SegmentAnalysis analysis1 = new SegmentAnalysis( - TEST_SEGMENT_ID1.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - null - ); - final SegmentAnalysis analysis2 = new SegmentAnalysis( - TEST_SEGMENT_ID2.toString(), - null, - new LinkedHashMap<>(), - 0, - 0, - null, - null, - null, - false - ); + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2).rollup(false).build(); - Assert.assertEquals( - analysis1, - SegmentMetadataQueryQueryToolChest - .mergeAnalyses(TEST_DATASOURCE.getTableNames(), analysis1, null, AggregatorMergeStrategy.STRICT) - ); - Assert.assertEquals( - analysis2, - SegmentMetadataQueryQueryToolChest - .mergeAnalyses(TEST_DATASOURCE.getTableNames(), null, analysis2, AggregatorMergeStrategy.STRICT) - ); + Assert.assertEquals(analysis1, mergeWithStrategy(analysis1, null, aggregatorMergeStrategy)); + Assert.assertEquals(analysis2, mergeWithStrategy(null, analysis2, aggregatorMergeStrategy)); Assert.assertNull( SegmentMetadataQueryQueryToolChest - .mergeAnalyses(TEST_DATASOURCE.getTableNames(), null, null, AggregatorMergeStrategy.STRICT) - ); - Assert.assertNull( - SegmentMetadataQueryQueryToolChest - .mergeAnalyses(TEST_DATASOURCE.getTableNames(), null, null, AggregatorMergeStrategy.LENIENT) - ); - Assert.assertNull( - SegmentMetadataQueryQueryToolChest - .mergeAnalyses(TEST_DATASOURCE.getTableNames(), null, null, AggregatorMergeStrategy.EARLIEST) - ); + .mergeAnalyses(TEST_DATASOURCE.getTableNames(), null, null, aggregatorMergeStrategy)); + } + + + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testProjections(AggregatorMergeStrategy aggregatorMergeStrategy) + { + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 100)) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 200)) + .build(); + + final SegmentAnalysis expected = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 300)) + .build(); + Assert.assertEquals(expected, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); + } + + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testProjectionsWithNull(AggregatorMergeStrategy aggregatorMergeStrategy) + { + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 100)) + .build(); + final SegmentAnalysis analysis1NullProjection = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1).build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 200)) + .build(); + final SegmentAnalysis analysis2NullProjection = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2).build(); + + Assert.assertNull(mergeWithStrategy(analysis1NullProjection, analysis2, aggregatorMergeStrategy).getProjections()); + Assert.assertNull(mergeWithStrategy(analysis1, analysis2NullProjection, aggregatorMergeStrategy).getProjections()); Assert.assertNull( - SegmentMetadataQueryQueryToolChest - .mergeAnalyses(TEST_DATASOURCE.getTableNames(), null, null, AggregatorMergeStrategy.LATEST) + mergeWithStrategy(analysis1NullProjection, analysis2NullProjection, aggregatorMergeStrategy).getProjections() ); } - private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2) + @EnumSource(AggregatorMergeStrategy.class) + @ParameterizedTest(name = "{index}: with AggregatorMergeStrategy {0}") + public void testProjectionsWithConflict(AggregatorMergeStrategy aggregatorMergeStrategy) + { + final SegmentAnalysis analysis1 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID1) + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 100)) + .projection("channel_sum_1", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 100)) + .projection("conflict_projection", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 100)) + .build(); + final SegmentAnalysis analysis2 = new SegmentAnalysis.Builder(TEST_SEGMENT_ID2) + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 200)) + .projection("channel_sum_2", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_DAILY, 200)) + .projection("conflict_projection", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_DAILY, 200)) + .build(); + + // conflict projection is ignored. + final SegmentAnalysis expectedStrict = new SegmentAnalysis.Builder( + "dummy_2021-01-01T00:00:00.000Z_2021-01-02T00:00:00.000Z_merged") + .projection("channel_sum", new AggregateProjectionMetadata(PROJECTION_CHANNEL_ADDED_HOURLY, 300)) + .build(); + Assert.assertEquals(expectedStrict, mergeWithStrategy(analysis1, analysis2, aggregatorMergeStrategy)); + } + + private static SegmentAnalysis mergeWithStrategy( + SegmentAnalysis analysis1, + SegmentAnalysis analysis2, + AggregatorMergeStrategy strategy + ) { return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( SegmentMetadataQueryQueryToolChest.mergeAnalyses( TEST_DATASOURCE.getTableNames(), analysis1, analysis2, - AggregatorMergeStrategy.STRICT - ) - ); + strategy + )); + } + + private static SegmentAnalysis mergeStrict(SegmentAnalysis analysis1, SegmentAnalysis analysis2) + { + return mergeWithStrategy(analysis1, analysis2, AggregatorMergeStrategy.STRICT); } private static SegmentAnalysis mergeLenient(SegmentAnalysis analysis1, SegmentAnalysis analysis2) { - return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( - SegmentMetadataQueryQueryToolChest.mergeAnalyses( - TEST_DATASOURCE.getTableNames(), - analysis1, - analysis2, - AggregatorMergeStrategy.LENIENT - ) - ); + return mergeWithStrategy(analysis1, analysis2, AggregatorMergeStrategy.LENIENT); } private static SegmentAnalysis mergeEarliest(SegmentAnalysis analysis1, SegmentAnalysis analysis2) { - return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( - SegmentMetadataQueryQueryToolChest.mergeAnalyses( - TEST_DATASOURCE.getTableNames(), - analysis1, - analysis2, - AggregatorMergeStrategy.EARLIEST - ) - ); + return mergeWithStrategy(analysis1, analysis2, AggregatorMergeStrategy.EARLIEST); } private static SegmentAnalysis mergeLatest(SegmentAnalysis analysis1, SegmentAnalysis analysis2) { - return SegmentMetadataQueryQueryToolChest.finalizeAnalysis( - SegmentMetadataQueryQueryToolChest.mergeAnalyses( - TEST_DATASOURCE.getTableNames(), - analysis1, - analysis2, - AggregatorMergeStrategy.LATEST - ) - ); + return mergeWithStrategy(analysis1, analysis2, AggregatorMergeStrategy.LATEST); } } diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java index 47decc388e2b..5fd5ea98d576 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataQueryTest.java @@ -62,6 +62,7 @@ import org.apache.druid.query.policy.NoRestrictionPolicy; import org.apache.druid.query.policy.RowFilterPolicy; import org.apache.druid.query.spec.LegacySegmentSpec; +import org.apache.druid.segment.AggregateProjectionMetadata; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -102,6 +103,8 @@ public class SegmentMetadataQueryTest extends InitializedNullHandlingTest ); private static final ObjectMapper MAPPER = new DefaultObjectMapper(); private static final String DATASOURCE = "testDatasource"; + private static final AggregateProjectionMetadata.Schema PROJECTION_SCHEMA = TestIndex.PROJECTIONS.get(0).toMetadataSchema(); + private static final int PROJECTION_ROWS = 279; @SuppressWarnings("unchecked") public static QueryRunner makeMMappedQueryRunner( @@ -205,7 +208,8 @@ public SegmentMetadataQueryTest( SegmentMetadataQuery.AnalysisType.SIZE, SegmentMetadataQuery.AnalysisType.INTERVAL, SegmentMetadataQuery.AnalysisType.MINMAX, - SegmentMetadataQuery.AnalysisType.AGGREGATORS + SegmentMetadataQuery.AnalysisType.AGGREGATORS, + SegmentMetadataQuery.AnalysisType.PROJECTIONS ) .merge(true) .build(); @@ -221,6 +225,10 @@ public SegmentMetadataQueryTest( for (AggregatorFactory agg : TestIndex.METRIC_AGGS) { expectedAggregators.put(agg.getName(), agg.getCombiningFactory()); } + final Map expectedProjections = ImmutableMap.of( + PROJECTION_SCHEMA.getName(), + new AggregateProjectionMetadata(PROJECTION_SCHEMA, PROJECTION_ROWS) + ); expectedSegmentAnalysis1 = new SegmentAnalysis( id1.toString(), @@ -268,6 +276,7 @@ public SegmentMetadataQueryTest( overallSize, 1209, expectedAggregators, + expectedProjections, null, null, null @@ -318,6 +327,7 @@ public SegmentMetadataQueryTest( overallSize, 1209, expectedAggregators, + expectedProjections, null, null, null @@ -399,6 +409,7 @@ public void testSegmentMetadataQueryWithRollupMerge() null, null, null, + null, rollup1 != rollup2 ? null : rollup1 ); @@ -473,6 +484,7 @@ public void testSegmentMetadataQueryWithHasMultipleValuesMerge() null, null, null, + null, null ); @@ -547,6 +559,7 @@ public void testSegmentMetadataQueryWithComplexColumnMerge() null, null, null, + null, null ); @@ -691,6 +704,10 @@ private void testSegmentMetadataQueryWithDefaultAnalysisMerge( expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedAggregators, + ImmutableMap.of( + PROJECTION_SCHEMA.getName(), + new AggregateProjectionMetadata(PROJECTION_SCHEMA, PROJECTION_ROWS * 2) + ), null, null, null @@ -749,6 +766,7 @@ public void testSegmentMetadataQueryWithNoAnalysisTypesMerge() null, null, null, + null, null ); @@ -815,6 +833,7 @@ public void testSegmentMetadataQueryWithAggregatorsMerge() expectedAggregators, null, null, + null, null ); @@ -881,6 +900,7 @@ public void testSegmentMetadataQueryWithAggregatorsMergeLenientStrategy() expectedAggregators, null, null, + null, null ); @@ -942,6 +962,7 @@ public void testSegmentMetadataQueryWithTimestampSpecMerge() 0, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, + null, new TimestampSpec("ts", "iso", null), null, null @@ -1005,6 +1026,7 @@ public void testSegmentMetadataQueryWithQueryGranularityMerge() expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, null, + null, Granularities.NONE, null ); diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java index 33147ce01b0e..7082dd744ed4 100644 --- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentMetadataUnionQueryTest.java @@ -122,6 +122,7 @@ public void testSegmentMetadataUnionQuery() null, null, null, + null, null ); SegmentMetadataQuery query = new Druids.SegmentMetadataQueryBuilder() diff --git a/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java b/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java index 636df8691b39..1b2018994bdd 100644 --- a/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java +++ b/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java @@ -83,9 +83,7 @@ public void testComparator() "theTime", VirtualColumns.create(Granularities.toVirtualColumn(Granularities.HOUR, "theTime")), Arrays.asList("theTime", "a", "b", "c"), - new AggregatorFactory[] { - new CountAggregatorFactory("chocula") - }, + new AggregatorFactory[]{new CountAggregatorFactory("chocula")}, Arrays.asList( OrderBy.ascending("theTime"), OrderBy.ascending("a"), @@ -95,14 +93,30 @@ public void testComparator() ), 123 ); - // same row count, but more aggs more better - AggregateProjectionMetadata better = new AggregateProjectionMetadata( + // same row count, but less grouping columns aggs more better + AggregateProjectionMetadata betterLessGroupingColumns = new AggregateProjectionMetadata( new AggregateProjectionMetadata.Schema( - "better", + "betterLessGroupingColumns", "theTime", VirtualColumns.create(Granularities.toVirtualColumn(Granularities.HOUR, "theTime")), Arrays.asList("c", "d", "theTime"), - new AggregatorFactory[] { + new AggregatorFactory[]{new CountAggregatorFactory("chocula")}, + Arrays.asList( + OrderBy.ascending("c"), + OrderBy.ascending("d"), + OrderBy.ascending("theTime") + ) + ), + 123 + ); + // same grouping columns, but more aggregators + AggregateProjectionMetadata evenBetterMoreAggs = new AggregateProjectionMetadata( + new AggregateProjectionMetadata.Schema( + "evenBetterMoreAggs", + "theTime", + VirtualColumns.create(Granularities.toVirtualColumn(Granularities.HOUR, "theTime")), + Arrays.asList("c", "d", "theTime"), + new AggregatorFactory[]{ new CountAggregatorFactory("chocula"), new LongSumAggregatorFactory("e", "e") }, @@ -114,11 +128,10 @@ public void testComparator() ), 123 ); - // small rows is best AggregateProjectionMetadata best = new AggregateProjectionMetadata( new AggregateProjectionMetadata.Schema( - "better", + "best", null, VirtualColumns.EMPTY, Arrays.asList("f", "g"), @@ -128,10 +141,14 @@ public void testComparator() 10 ); metadataBest.add(good); - metadataBest.add(better); + metadataBest.add(betterLessGroupingColumns); + metadataBest.add(evenBetterMoreAggs); metadataBest.add(best); Assert.assertEquals(best, metadataBest.first()); - Assert.assertEquals(good, metadataBest.last()); + Assert.assertArrayEquals( + new AggregateProjectionMetadata[]{best, evenBetterMoreAggs, betterLessGroupingColumns, good}, + metadataBest.toArray() + ); } @Test @@ -141,12 +158,12 @@ public void testInvalidGrouping() DruidException.class, () -> new AggregateProjectionMetadata( new AggregateProjectionMetadata.Schema( - "other_projection", - null, - null, - null, - null, - null + "other_projection", + null, + null, + null, + null, + null ), 0 ) @@ -157,12 +174,12 @@ public void testInvalidGrouping() DruidException.class, () -> new AggregateProjectionMetadata( new AggregateProjectionMetadata.Schema( - "other_projection", - null, - null, - Collections.emptyList(), - null, - null + "other_projection", + null, + null, + Collections.emptyList(), + null, + null ), 0 ) diff --git a/processing/src/test/java/org/apache/druid/segment/MetadataTest.java b/processing/src/test/java/org/apache/druid/segment/MetadataTest.java index 5d06d2ab8c25..23318406bb47 100644 --- a/processing/src/test/java/org/apache/druid/segment/MetadataTest.java +++ b/processing/src/test/java/org/apache/druid/segment/MetadataTest.java @@ -66,8 +66,18 @@ public void testSerde() throws Exception null ); + // Empty projection is not included in the json object + Metadata metadataWithEmptyProjection = new Metadata( + Collections.singletonMap("k", "v"), + aggregators, + null, + Granularities.ALL, + Boolean.FALSE, + null, + ImmutableList.of() + ); Metadata other = jsonMapper.readValue( - jsonMapper.writeValueAsString(metadata), + jsonMapper.writeValueAsString(metadataWithEmptyProjection), Metadata.class ); diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java index 83f1ec476495..bb9c52b2d200 100644 --- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java +++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java @@ -27,6 +27,7 @@ import com.google.common.io.Resources; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.ResourceInputSource; +import org.apache.druid.data.input.impl.AggregateProjectionSpec; import org.apache.druid.data.input.impl.DelimitedParseSpec; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -40,6 +41,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -179,6 +181,17 @@ public class TestIndex new DoubleMaxAggregatorFactory(DOUBLE_METRICS[2], VIRTUAL_COLUMNS.getVirtualColumns()[0].getOutputName()), new HyperUniquesAggregatorFactory("quality_uniques", "quality") }; + public static final ImmutableList PROJECTIONS = ImmutableList.of( + new AggregateProjectionSpec( + "index_projection", + VirtualColumns.create(Granularities.toVirtualColumn(Granularities.DAY, "__gran")), + Arrays.asList( + new LongDimensionSchema("__gran"), + new StringDimensionSchema("market") + ), + new AggregatorFactory[]{new DoubleMaxAggregatorFactory("maxQuality", "qualityLong")} + ) + ); public static final IndexSpec INDEX_SPEC = IndexSpec.DEFAULT; public static final JsonInputFormat DEFAULT_JSON_INPUT_FORMAT = new JsonInputFormat( @@ -406,9 +419,14 @@ private static IncrementalIndex fromJsonResource( new IncrementalIndexSchema.Builder() .withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis()) .withTimestampSpec(new TimestampSpec("ts", "iso", null)) - .withDimensionsSpec(DimensionsSpec.builder().setDimensions(dimensionsSpec.getDimensions()).setDimensionExclusions(ImmutableList.of("index")).setIncludeAllDimensions(true).build()) + .withDimensionsSpec(DimensionsSpec.builder() + .setDimensions(dimensionsSpec.getDimensions()) + .setDimensionExclusions(ImmutableList.of("index")) + .setIncludeAllDimensions(true) + .build()) .withVirtualColumns(VIRTUAL_COLUMNS) .withMetrics(METRIC_AGGS) + .withProjections(PROJECTIONS) .withRollup(rollup) .build(), DEFAULT_JSON_INPUT_FORMAT @@ -523,6 +541,7 @@ public static IncrementalIndex makeIncrementalIndexFromTsvCharSource(final CharS .withDimensionsSpec(DIMENSIONS_SPEC) .withVirtualColumns(VIRTUAL_COLUMNS) .withMetrics(METRIC_AGGS) + .withProjections(PROJECTIONS) .withRollup(true) .build(); final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder() diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 8d63da5f757e..a62114a62a50 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -1113,6 +1113,7 @@ public void testSegmentMetadataColumnType() null, null, null, + null, null ) ); @@ -1179,6 +1180,7 @@ public void testSegmentMetadataFallbackType() null, null, null, + null, null ) );