Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions common/src/main/java/io/druid/common/guava/CombiningSequence.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.common.guava;

import com.google.common.base.Function;
import com.google.common.collect.Ordering;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
Expand All @@ -37,25 +38,29 @@ public class CombiningSequence<T> implements Sequence<T>
public static <T> CombiningSequence<T> create(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
BinaryFn<T, T, T> mergeFn,
Function transformFn
)
{
return new CombiningSequence<T>(baseSequence, ordering, mergeFn);
return new CombiningSequence<T>(baseSequence, ordering, mergeFn, transformFn);
}

private final Sequence<T> baseSequence;
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;
private final Function transformFn;

public CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
BinaryFn<T, T, T> mergeFn,
Function transformFn
)
{
this.baseSequence = baseSequence;
this.ordering = ordering;
this.mergeFn = mergeFn;
this.transformFn = transformFn;
}

@Override
Expand Down Expand Up @@ -117,6 +122,9 @@ public <OutType, T> Yielder<OutType> makeYielder(
@Override
public OutType get()
{
if (transformFn != null) {
return (OutType) transformFn.apply(retVal);
}
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public Pair<Integer, Integer> apply(

return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
}
},
null
);

List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList());
Expand Down
8 changes: 6 additions & 2 deletions docs/content/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ There are several main parts to a segment metadata query:
|toInclude|A JSON Object representing what columns should be included in the result. Defaults to "all".|no|
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.html)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size"]. See section [analysisTypes](#analysistypes) for more details.|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "size", "interval"]. See section [analysisTypes](#analysistypes) for more details.|no|

The format of the result is:

Expand Down Expand Up @@ -96,7 +96,7 @@ This is a list of properties that determines the amount of information returned

By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.

There are 2 types of column analyses:
There are 3 types of column analyses:

#### cardinality

Expand All @@ -107,3 +107,7 @@ There are 2 types of column analyses:
* Estimated byte size for the segment columns if they were stored in a flat format

* Estimated total segment byte size in if it was stored in a flat format

#### interval

* If present, the SegmentMetadataQuery will return the list of intervals associated with the queried segments.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public ResultMergeQueryRunner(
@Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context)
{
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query));
return CombiningSequence.create(baseRunner.run(query, context), makeOrdering(query), createMergeFn(query), null);
}

protected abstract Ordering<T> makeOrdering(Query<T> query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,8 @@ private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
}

private boolean analysisHasInterva(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes) {
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.INTERVAL);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.common.guava.CombiningSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Row;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
Expand Down Expand Up @@ -77,6 +79,36 @@ public QueryRunner<SegmentAnalysis> mergeResults(final QueryRunner<SegmentAnalys
{
return new ResultMergeQueryRunner<SegmentAnalysis>(runner)
{
private Function<SegmentAnalysis, SegmentAnalysis> transformFn = new Function<SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis analysis)
{
return new SegmentAnalysis(
analysis.getId(),
JodaUtils.condenseIntervals(analysis.getIntervals()),
analysis.getColumns(),
analysis.getSize(),
analysis.getNumRows()
);
}
};

@Override
public Sequence<SegmentAnalysis> doRun(
QueryRunner<SegmentAnalysis> baseRunner,
Query<SegmentAnalysis> query,
Map<String, Object> context
)
{
return CombiningSequence.create(
baseRunner.run(query, context),
makeOrdering(query),
createMergeFn(query),
transformFn
);
}

@Override
protected Ordering<SegmentAnalysis> makeOrdering(Query<SegmentAnalysis> query)
{
Expand Down Expand Up @@ -115,9 +147,11 @@ public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
return arg1;
}

List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
List<Interval> newIntervals = null;
if (query.hasInterval()) {
newIntervals = arg1.getIntervals();
newIntervals.addAll(arg2.getIntervals());
}

final Map<String, ColumnAnalysis> leftColumns = arg1.getColumns();
final Map<String, ColumnAnalysis> rightColumns = arg2.getColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -117,12 +119,13 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
columns.put(columnName, column);
}
}
List<Interval> retIntervals = query.hasInterval() ? Arrays.asList(segment.getDataInterval()) : null;

return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
segment.getIdentifier(),
Arrays.asList(segment.getDataInterval()),
retIntervals,
columns,
totalSize,
numRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
public enum AnalysisType
{
CARDINALITY,
SIZE;
SIZE,
INTERVAL;

@JsonValue
@Override
Expand All @@ -77,7 +78,8 @@ public byte[] getCacheKey()

public static final EnumSet<AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
AnalysisType.CARDINALITY,
AnalysisType.SIZE
AnalysisType.SIZE,
AnalysisType.INTERVAL
);

private final ColumnIncluderator toInclude;
Expand Down Expand Up @@ -163,6 +165,11 @@ public boolean hasSize()
return analysisTypes.contains(AnalysisType.SIZE);
}

public boolean hasInterval()
{
return analysisTypes.contains(AnalysisType.INTERVAL);
}

public byte[] getAnalysisTypesCacheKey()
{
int size = 1;
Expand Down Expand Up @@ -259,6 +266,10 @@ public boolean equals(Object o)
if (usingDefaultInterval != that.usingDefaultInterval) {
return false;
}

if (!analysisTypes.equals(that.analysisTypes)) {
return false;
}
return !(toInclude != null ? !toInclude.equals(that.toInclude) : that.toInclude != null);

}
Expand All @@ -270,6 +281,7 @@ public int hashCode()
result = 31 * result + (toInclude != null ? toInclude.hashCode() : 0);
result = 31 * result + (merge ? 1 : 0);
result = 31 * result + (usingDefaultInterval ? 1 : 0);
result = 31 * result + analysisTypes.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testCacheStrategy() throws Exception
new SegmentMetadataQueryQueryToolChest(null).getCacheStrategy(query);

// Test cache key generation
byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01};
byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01, 0x02};
byte[] actualKey = strategy.computeCacheKey(query);
Assert.assertArrayEquals(expectedKey, actualKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ protected int persistHydrant(

final File persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
interval,
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,21 +396,23 @@ public void run()
Map<Long, Sink> sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());


List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(new Long(0)));
DateTime startTime = new DateTime("1970-01-01T00:00:00.000Z");
Interval expectedInterval = new Interval(startTime, new DateTime("1971-01-01T00:00:00.000Z"));
Assert.assertEquals(0, hydrants.get(0).getCount());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-01-01T00:00:00.001Z")),
expectedInterval,
hydrants.get(0).getSegment().getDataInterval()
);
Assert.assertEquals(2, hydrants.get(1).getCount());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-03-01T00:00:00.001Z")),
expectedInterval,
hydrants.get(1).getSegment().getDataInterval()
);
Assert.assertEquals(4, hydrants.get(2).getCount());
Assert.assertEquals(
new Interval(startTime, new DateTime("1970-05-01T00:00:00.001Z")),
expectedInterval,
hydrants.get(2).getSegment().getDataInterval()
);

Expand Down