-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Add support for a configurable default segment history period for segmentMetadata queries #1726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /* | ||
| * Licensed to Metamarkets Group Inc. (Metamarkets) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. Metamarkets licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package io.druid.query.metadata; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import org.joda.time.Period; | ||
| import org.joda.time.format.ISOPeriodFormat; | ||
| import org.joda.time.format.PeriodFormatter; | ||
|
|
||
|
|
||
| public class SegmentMetadataQueryConfig | ||
| { | ||
| private static final String DEFAULT_PERIOD_STRING = "P1W"; | ||
| private static final PeriodFormatter ISO_FORMATTER = ISOPeriodFormat.standard(); | ||
|
|
||
| @JsonProperty | ||
| private Period defaultHistory = ISO_FORMATTER.parsePeriod(DEFAULT_PERIOD_STRING); | ||
|
|
||
| public SegmentMetadataQueryConfig(String period) | ||
| { | ||
| defaultHistory = ISO_FORMATTER.parsePeriod(period); | ||
| } | ||
|
|
||
| public SegmentMetadataQueryConfig() | ||
| { | ||
| } | ||
|
|
||
| public Period getDefaultHistory() | ||
| { | ||
| return defaultHistory; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,13 @@ | |
| import com.fasterxml.jackson.core.type.TypeReference; | ||
| import com.google.common.base.Function; | ||
| import com.google.common.base.Functions; | ||
| import com.google.common.base.Predicate; | ||
| import com.google.common.collect.Iterables; | ||
| import com.google.common.collect.Lists; | ||
| import com.google.common.collect.Maps; | ||
| import com.google.common.collect.Ordering; | ||
| import com.google.common.collect.Sets; | ||
| import com.google.inject.Inject; | ||
| import com.metamx.common.ISE; | ||
| import com.metamx.common.guava.MergeSequence; | ||
| import com.metamx.common.guava.Sequence; | ||
|
|
@@ -41,6 +44,8 @@ | |
| import io.druid.query.metadata.metadata.ColumnAnalysis; | ||
| import io.druid.query.metadata.metadata.SegmentAnalysis; | ||
| import io.druid.query.metadata.metadata.SegmentMetadataQuery; | ||
| import io.druid.timeline.LogicalSegment; | ||
| import org.joda.time.DateTime; | ||
| import org.joda.time.Interval; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
@@ -56,6 +61,16 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn | |
| }; | ||
| private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; | ||
|
|
||
| private final SegmentMetadataQueryConfig config; | ||
|
|
||
| @Inject | ||
| public SegmentMetadataQueryQueryToolChest( | ||
| SegmentMetadataQueryConfig config | ||
| ) | ||
| { | ||
| this.config = config; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Storing either the config or the defaultHistory period should be enough- doesn't look like there's a need to store both |
||
| } | ||
|
|
||
| @Override | ||
| public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalysis> runner) | ||
| { | ||
|
|
@@ -216,6 +231,37 @@ public Sequence<SegmentAnalysis> mergeSequences(Sequence<Sequence<SegmentAnalysi | |
| }; | ||
| } | ||
|
|
||
| @Override | ||
| public <T extends LogicalSegment> List<T> filterSegments(SegmentMetadataQuery query, List<T> segments) | ||
| { | ||
| if (!query.isUsingDefaultInterval()) { | ||
| return segments; | ||
| } | ||
|
|
||
| if (segments.size() <= 1) { | ||
| return segments; | ||
| } | ||
|
|
||
| final T max = segments.get(segments.size() - 1); | ||
|
|
||
| DateTime targetEnd = max.getInterval().getEnd(); | ||
| final Interval targetInterval = new Interval(config.getDefaultHistory(), targetEnd); | ||
|
|
||
| return Lists.newArrayList( | ||
| Iterables.filter( | ||
| segments, | ||
| new Predicate<T>() | ||
| { | ||
| @Override | ||
| public boolean apply(T input) | ||
| { | ||
| return (input.getInterval().overlaps(targetInterval)); | ||
| } | ||
| } | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| private Ordering<SegmentAnalysis> getOrdering() | ||
| { | ||
| return new Ordering<SegmentAnalysis>() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,33 +20,57 @@ | |
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.google.common.base.Preconditions; | ||
| import io.druid.common.utils.JodaUtils; | ||
| import io.druid.query.BaseQuery; | ||
| import io.druid.query.DataSource; | ||
| import io.druid.query.Query; | ||
| import io.druid.query.TableDataSource; | ||
| import io.druid.query.spec.MultipleIntervalSegmentSpec; | ||
| import io.druid.query.spec.QuerySegmentSpec; | ||
| import org.joda.time.Interval; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.Map; | ||
|
|
||
| public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis> | ||
| { | ||
| public static final Interval DEFAULT_INTERVAL = new Interval( | ||
| JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT | ||
| ); | ||
|
|
||
| private final ColumnIncluderator toInclude; | ||
| private final boolean merge; | ||
| private final boolean usingDefaultInterval; | ||
|
|
||
| @JsonCreator | ||
| public SegmentMetadataQuery( | ||
| @JsonProperty("dataSource") DataSource dataSource, | ||
| @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, | ||
| @JsonProperty("toInclude") ColumnIncluderator toInclude, | ||
| @JsonProperty("merge") Boolean merge, | ||
| @JsonProperty("context") Map<String, Object> context | ||
| @JsonProperty("context") Map<String, Object> context, | ||
| @JsonProperty("usingDefaultInterval") Boolean useDefaultInterval | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this flag needs part of the json ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The querySegmentSpec is always going to be set after the first object construction (constructor sets it to DEFAULT_INTERVAL if it's null) so this needs to be part of the json if the query is ever going to be constructed and then re-serialized. That could potentially happen if someone creates the Query object locally and then serializes it to send to the broker.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The flag needs to be in the constructor, in the situations where a query object is generated from an existing object, e.g. query.withOverriddenContext, query.withQuerySegmentSpec, the querySegmentSpec will not be null. |
||
| ) | ||
| { | ||
| super(dataSource, querySegmentSpec, context); | ||
| super( | ||
| dataSource, | ||
| (querySegmentSpec == null) ? new MultipleIntervalSegmentSpec(Arrays.asList(DEFAULT_INTERVAL)) | ||
| : querySegmentSpec, | ||
| context | ||
| ); | ||
|
|
||
| if (querySegmentSpec == null) { | ||
| this.usingDefaultInterval = true; | ||
| } else { | ||
| this.usingDefaultInterval = useDefaultInterval == null ? false : useDefaultInterval; | ||
| } | ||
|
|
||
| this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude; | ||
| this.merge = merge == null ? false : merge; | ||
| Preconditions.checkArgument(dataSource instanceof TableDataSource, "SegmentMetadataQuery only supports table datasource"); | ||
| Preconditions.checkArgument( | ||
| dataSource instanceof TableDataSource, | ||
| "SegmentMetadataQuery only supports table datasource" | ||
| ); | ||
| } | ||
|
|
||
| @JsonProperty | ||
|
|
@@ -61,6 +85,12 @@ public boolean isMerge() | |
| return merge; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public boolean isUsingDefaultInterval() | ||
| { | ||
| return usingDefaultInterval; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasFilters() | ||
| { | ||
|
|
@@ -78,7 +108,11 @@ public Query<SegmentAnalysis> withOverriddenContext(Map<String, Object> contextO | |
| { | ||
| return new SegmentMetadataQuery( | ||
| getDataSource(), | ||
| getQuerySegmentSpec(), toInclude, merge, computeOverridenContext(contextOverride) | ||
| getQuerySegmentSpec(), | ||
| toInclude, | ||
| merge, | ||
| computeOverridenContext(contextOverride), | ||
| usingDefaultInterval | ||
| ); | ||
| } | ||
|
|
||
|
|
@@ -87,7 +121,12 @@ public Query<SegmentAnalysis> withQuerySegmentSpec(QuerySegmentSpec spec) | |
| { | ||
| return new SegmentMetadataQuery( | ||
| getDataSource(), | ||
| spec, toInclude, merge, getContext()); | ||
| spec, | ||
| toInclude, | ||
| merge, | ||
| getContext(), | ||
| usingDefaultInterval | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -98,22 +137,34 @@ public Query<SegmentAnalysis> withDataSource(DataSource dataSource) | |
| getQuerySegmentSpec(), | ||
| toInclude, | ||
| merge, | ||
| getContext()); | ||
| getContext(), | ||
| usingDefaultInterval | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should include the new field - you can regenerate this and hashCode with IntelliJ
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gianm functions regenerated |
||
| { | ||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| if (!super.equals(o)) return false; | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| if (!super.equals(o)) { | ||
| return false; | ||
| } | ||
|
|
||
| SegmentMetadataQuery that = (SegmentMetadataQuery) o; | ||
|
|
||
| if (merge != that.merge) return false; | ||
| if (toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null) return false; | ||
| if (merge != that.merge) { | ||
| return false; | ||
| } | ||
| if (usingDefaultInterval != that.usingDefaultInterval) { | ||
| return false; | ||
| } | ||
| return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -122,6 +173,7 @@ public int hashCode() | |
| int result = super.hashCode(); | ||
| result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0); | ||
| result = 31 * result + (merge ? 1 : 0); | ||
| result = 31 * result + (usingDefaultInterval ? 1 : 0); | ||
| return result; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use the header format described in https://github.com/druid-io/druid/blob/75a582974b8d3305981af30e691c67dee35acb63/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java for example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fjy header is changed